blob: a3dec75e2bbda6f16d0ad0a5c92403267e17cb2b [file] [log] [blame]
khenaidoo26721882021-08-11 17:42:52 -04001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package grpc
17
18import (
19 "context"
20 "fmt"
21 "reflect"
22 "strings"
23 "sync"
24 "time"
25
26 grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
27 grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
28 "github.com/opencord/voltha-lib-go/v7/pkg/log"
29 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
30 "github.com/opencord/voltha-protos/v5/go/adapter_services"
31 "github.com/opencord/voltha-protos/v5/go/core"
32 "google.golang.org/grpc"
33 "google.golang.org/grpc/keepalive"
34)
35
36type event byte
37type state byte
38type SetAndTestServiceHandler func(context.Context, *grpc.ClientConn) interface{}
39type RestartedHandler func(ctx context.Context, endPoint string) error
40
41type contextKey string
42
43func (c contextKey) String() string {
44 return string(c)
45}
46
47var (
48 grpcMonitorContextKey = contextKey("grpc-monitor")
49)
50
51const (
52 grpcBackoffInitialInterval = "GRPC_BACKOFF_INITIAL_INTERVAL"
53 grpcBackoffMaxInterval = "GRPC_BACKOFF_MAX_INTERVAL"
54 grpcBackoffMaxElapsedTime = "GRPC_BACKOFF_MAX_ELAPSED_TIME"
55 grpcMonitorInterval = "GRPC_MONITOR_INTERVAL"
56)
57
58const (
59 DefaultBackoffInitialInterval = 100 * time.Millisecond
60 DefaultBackoffMaxInterval = 5 * time.Second
61 DefaultBackoffMaxElapsedTime = 0 * time.Second // No time limit
62 DefaultGRPCMonitorInterval = 5 * time.Second
63)
64
65const (
66 connectionErrorSubString = "SubConns are in TransientFailure"
67 connectionClosedSubstring = "client connection is closing"
68 connectionError = "connection error"
69 connectionSystemNotReady = "system is not ready"
70)
71
72const (
73 eventConnecting = event(iota)
74 eventConnected
75 eventDisconnected
76 eventStopped
77 eventError
78
79 stateConnected = state(iota)
80 stateConnecting
81 stateDisconnected
82)
83
84type Client struct {
85 apiEndPoint string
86 connection *grpc.ClientConn
87 connectionLock sync.RWMutex
88 stateLock sync.RWMutex
89 state state
90 service interface{}
91 events chan event
92 onRestart RestartedHandler
93 backoffInitialInterval time.Duration
94 backoffMaxInterval time.Duration
95 backoffMaxElapsedTime time.Duration
96 activityCheck bool
97 monitorInterval time.Duration
98 activeCh chan struct{}
99 activeChMutex sync.RWMutex
100 done bool
101 livenessCallback func(timestamp time.Time)
102}
103
104type ClientOption func(*Client)
105
106func ActivityCheck(enable bool) ClientOption {
107 return func(args *Client) {
108 args.activityCheck = enable
109 }
110}
111
112func NewClient(endpoint string, onRestart RestartedHandler, opts ...ClientOption) (*Client, error) {
113 c := &Client{
114 apiEndPoint: endpoint,
115 onRestart: onRestart,
116 events: make(chan event, 1),
117 state: stateDisconnected,
118 backoffInitialInterval: DefaultBackoffInitialInterval,
119 backoffMaxInterval: DefaultBackoffMaxInterval,
120 backoffMaxElapsedTime: DefaultBackoffMaxElapsedTime,
121 monitorInterval: DefaultGRPCMonitorInterval,
122 }
123 for _, option := range opts {
124 option(c)
125 }
126
127 // Check for environment variables
128 if err := SetFromEnvVariable(grpcBackoffInitialInterval, &c.backoffInitialInterval); err != nil {
129 logger.Warnw(context.Background(), "failure-reading-env-variable", log.Fields{"error": err, "variable": grpcBackoffInitialInterval})
130 }
131
132 if err := SetFromEnvVariable(grpcBackoffMaxInterval, &c.backoffMaxInterval); err != nil {
133 logger.Warnw(context.Background(), "failure-reading-env-variable", log.Fields{"error": err, "variable": grpcBackoffMaxInterval})
134 }
135
136 if err := SetFromEnvVariable(grpcBackoffMaxElapsedTime, &c.backoffMaxElapsedTime); err != nil {
137 logger.Warnw(context.Background(), "failure-reading-env-variable", log.Fields{"error": err, "variable": grpcBackoffMaxElapsedTime})
138 }
139
140 if err := SetFromEnvVariable(grpcMonitorInterval, &c.monitorInterval); err != nil {
141 logger.Warnw(context.Background(), "failure-reading-env-variable", log.Fields{"error": err, "variable": grpcMonitorInterval})
142 }
143
144 logger.Infow(context.Background(), "initialized-client", log.Fields{"client": c})
145
146 // Sanity check
147 if c.backoffInitialInterval > c.backoffMaxInterval {
148 return nil, fmt.Errorf("initial retry delay %v is greater than maximum retry delay %v", c.backoffInitialInterval, c.backoffMaxInterval)
149 }
150
151 return c, nil
152}
153
154func (c *Client) GetClient() (interface{}, error) {
155 c.connectionLock.RLock()
156 defer c.connectionLock.RUnlock()
157 if c.service == nil {
158 return nil, fmt.Errorf("no connection to %s", c.apiEndPoint)
159 }
160 return c.service, nil
161}
162
163// GetCoreServiceClient is a helper function that returns a concrete service instead of the GetClient() API
164// which returns an interface
165func (c *Client) GetCoreServiceClient() (core.CoreServiceClient, error) {
166 c.connectionLock.RLock()
167 defer c.connectionLock.RUnlock()
168 if c.service == nil {
169 return nil, fmt.Errorf("no core connection to %s", c.apiEndPoint)
170 }
171 client, ok := c.service.(core.CoreServiceClient)
172 if ok {
173 return client, nil
174 }
175 return nil, fmt.Errorf("invalid-service-%s", reflect.TypeOf(c.service))
176}
177
178// GetOnuAdapterServiceClient is a helper function that returns a concrete service instead of the GetClient() API
179// which returns an interface
180func (c *Client) GetOnuInterAdapterServiceClient() (adapter_services.OnuInterAdapterServiceClient, error) {
181 c.connectionLock.RLock()
182 defer c.connectionLock.RUnlock()
183 if c.service == nil {
184 return nil, fmt.Errorf("no child adapter connection to %s", c.apiEndPoint)
185 }
186 client, ok := c.service.(adapter_services.OnuInterAdapterServiceClient)
187 if ok {
188 return client, nil
189 }
190 return nil, fmt.Errorf("invalid-service-%s", reflect.TypeOf(c.service))
191}
192
193// GetOltAdapterServiceClient is a helper function that returns a concrete service instead of the GetClient() API
194// which returns an interface
195func (c *Client) GetOltInterAdapterServiceClient() (adapter_services.OltInterAdapterServiceClient, error) {
196 c.connectionLock.RLock()
197 defer c.connectionLock.RUnlock()
198 if c.service == nil {
199 return nil, fmt.Errorf("no parent adapter connection to %s", c.apiEndPoint)
200 }
201 client, ok := c.service.(adapter_services.OltInterAdapterServiceClient)
202 if ok {
203 return client, nil
204 }
205 return nil, fmt.Errorf("invalid-service-%s", reflect.TypeOf(c.service))
206}
207
208func (c *Client) Reset(ctx context.Context) {
209 logger.Debugw(ctx, "resetting-client-connection", log.Fields{"endpoint": c.apiEndPoint})
210 c.stateLock.Lock()
211 defer c.stateLock.Unlock()
212 if c.state == stateConnected {
213 c.state = stateDisconnected
214 c.events <- eventDisconnected
215 }
216}
217
218func (c *Client) clientInterceptor(ctx context.Context, method string, req interface{}, reply interface{},
219 cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
220 // Nothing to do before intercepting the call
221 err := invoker(ctx, method, req, reply, cc, opts...)
222 // On connection failure, start the reconnect process depending on the error response
223 if err != nil {
224 logger.Errorw(ctx, "received-error", log.Fields{"error": err, "context": ctx, "endpoint": c.apiEndPoint})
225 if strings.Contains(err.Error(), connectionErrorSubString) ||
226 strings.Contains(err.Error(), connectionError) ||
227 strings.Contains(err.Error(), connectionSystemNotReady) ||
228 isGrpcMonitorKeyPresentInContext(ctx) {
229 c.stateLock.Lock()
230 if c.state == stateConnected {
khenaidoo26721882021-08-11 17:42:52 -0400231 c.state = stateDisconnected
khenaidooaa290962021-10-22 18:14:33 -0400232 logger.Warnw(context.Background(), "sending-disconnect-event", log.Fields{"endpoint": c.apiEndPoint, "error": err, "curr-state": stateConnected, "new-state": c.state})
khenaidoo26721882021-08-11 17:42:52 -0400233 c.events <- eventDisconnected
234 }
235 c.stateLock.Unlock()
236 } else if strings.Contains(err.Error(), connectionClosedSubstring) {
237 logger.Errorw(context.Background(), "invalid-client-connection-closed", log.Fields{"endpoint": c.apiEndPoint, "error": err})
238 }
239 return err
240 }
241 // Update activity on success only
242 c.updateActivity(ctx)
243 return nil
244}
245
246// updateActivity pushes an activity indication on the channel so that the monitoring routine does not validate
247// the gRPC connection when the connection is being used. Note that this update is done both when the connection
248// is alive or a connection error is returned. A separate routine takes care of doing the re-connect.
249func (c *Client) updateActivity(ctx context.Context) {
250 c.activeChMutex.RLock()
251 defer c.activeChMutex.RUnlock()
252 if c.activeCh != nil {
253 logger.Debugw(ctx, "update-activity", log.Fields{"api-endpoint": c.apiEndPoint})
254 c.activeCh <- struct{}{}
255
256 // Update liveness only in connected state
257 if c.livenessCallback != nil {
258 c.stateLock.RLock()
259 if c.state == stateConnected {
260 c.livenessCallback(time.Now())
261 }
262 c.stateLock.RUnlock()
263 }
264 }
265}
266
267func WithGrpcMonitorContext(ctx context.Context, name string) context.Context {
268 ctx = context.WithValue(ctx, grpcMonitorContextKey, name)
269 return ctx
270}
271
272func isGrpcMonitorKeyPresentInContext(ctx context.Context) bool {
273 if ctx != nil {
274 _, present := ctx.Value(grpcMonitorContextKey).(string)
275 return present
276 }
277 return false
278}
279
280// monitorActivity monitors the activity on the gRPC connection. If there are no activity after a specified
281// timeout, it will send a default API request on that connection. If the connection is good then nothing
282// happens. If it's bad this will trigger reconnection attempts.
283func (c *Client) monitorActivity(ctx context.Context, handler SetAndTestServiceHandler) {
284 logger.Infow(ctx, "start-activity-monitor", log.Fields{"endpoint": c.apiEndPoint})
285
286 // Create an activity monitor channel. Unbuffered channel works well. However, we use a buffered
287 // channel here as a safeguard of having the grpc interceptor publishing too many events that can be
288 // consumed by this monitoring thread
289 c.activeChMutex.Lock()
290 c.activeCh = make(chan struct{}, 10)
291 c.activeChMutex.Unlock()
292
khenaidooaa290962021-10-22 18:14:33 -0400293 grpcMonitorCheckRunning := false
294 var grpcMonitorCheckRunningLock sync.RWMutex
295
khenaidoo26721882021-08-11 17:42:52 -0400296 // Interval to wait for no activity before probing the connection
297 timeout := c.monitorInterval
298loop:
299 for {
300 timeoutTimer := time.NewTimer(timeout)
301 select {
302
303 case <-c.activeCh:
khenaidooaa290962021-10-22 18:14:33 -0400304 logger.Debugw(ctx, "endpoint-reachable", log.Fields{"endpoint": c.apiEndPoint})
khenaidoo26721882021-08-11 17:42:52 -0400305
306 // Reset timer
307 if !timeoutTimer.Stop() {
khenaidooaa290962021-10-22 18:14:33 -0400308 select {
309 case <-timeoutTimer.C:
310 default:
311 }
khenaidoo26721882021-08-11 17:42:52 -0400312 }
313
314 case <-ctx.Done():
315 break loop
316
317 case <-timeoutTimer.C:
318 // Trigger an activity check if the state is connected. If the state is not connected then there is already
319 // a backoff retry mechanism in place to retry establishing connection.
320 c.stateLock.RLock()
khenaidooaa290962021-10-22 18:14:33 -0400321 grpcMonitorCheckRunningLock.RLock()
322 runCheck := (c.state == stateConnected) && !grpcMonitorCheckRunning
323 grpcMonitorCheckRunningLock.RUnlock()
khenaidoo26721882021-08-11 17:42:52 -0400324 c.stateLock.RUnlock()
325 if runCheck {
326 go func() {
khenaidooaa290962021-10-22 18:14:33 -0400327 grpcMonitorCheckRunningLock.Lock()
328 if grpcMonitorCheckRunning {
329 grpcMonitorCheckRunningLock.Unlock()
330 logger.Debugw(ctx, "connection-check-already-in-progress", log.Fields{"api-endpoint": c.apiEndPoint})
331 return
332 }
333 grpcMonitorCheckRunning = true
334 grpcMonitorCheckRunningLock.Unlock()
335
khenaidoo26721882021-08-11 17:42:52 -0400336 logger.Debugw(ctx, "connection-check-start", log.Fields{"api-endpoint": c.apiEndPoint})
337 subCtx, cancel := context.WithTimeout(ctx, c.backoffMaxInterval)
338 defer cancel()
339 subCtx = WithGrpcMonitorContext(subCtx, "grpc-monitor")
340 c.connectionLock.RLock()
341 defer c.connectionLock.RUnlock()
342 if c.connection != nil {
343 response := handler(subCtx, c.connection)
344 logger.Debugw(ctx, "connection-check-response", log.Fields{"api-endpoint": c.apiEndPoint, "up": response != nil})
345 }
khenaidooaa290962021-10-22 18:14:33 -0400346 grpcMonitorCheckRunningLock.Lock()
347 grpcMonitorCheckRunning = false
348 grpcMonitorCheckRunningLock.Unlock()
khenaidoo26721882021-08-11 17:42:52 -0400349 }()
350 }
351 }
352 }
353 logger.Infow(ctx, "activity-monitor-stopping", log.Fields{"endpoint": c.apiEndPoint})
354}
355
356// Start kicks off the adapter agent by trying to connect to the adapter
357func (c *Client) Start(ctx context.Context, handler SetAndTestServiceHandler) {
358 logger.Debugw(ctx, "Starting GRPC - Client", log.Fields{"api-endpoint": c.apiEndPoint})
359
360 // If the context contains a k8s probe then register services
361 p := probe.GetProbeFromContext(ctx)
362 if p != nil {
363 p.RegisterService(ctx, c.apiEndPoint)
364 }
365
366 // Enable activity check, if required
367 if c.activityCheck {
368 go c.monitorActivity(ctx, handler)
369 }
370
371 initialConnection := true
372 c.events <- eventConnecting
373 backoff := NewBackoff(c.backoffInitialInterval, c.backoffMaxInterval, c.backoffMaxElapsedTime)
374 attempt := 1
375loop:
376 for {
377 select {
378 case <-ctx.Done():
379 logger.Debugw(ctx, "context-closing", log.Fields{"endpoint": c.apiEndPoint})
380 return
381 case event := <-c.events:
382 logger.Debugw(ctx, "received-event", log.Fields{"event": event, "endpoint": c.apiEndPoint})
383 switch event {
384 case eventConnecting:
khenaidoo26721882021-08-11 17:42:52 -0400385 c.stateLock.Lock()
khenaidooaa290962021-10-22 18:14:33 -0400386 logger.Debugw(ctx, "connection-start", log.Fields{"endpoint": c.apiEndPoint, "attempts": attempt, "curr-state": c.state})
khenaidoo26721882021-08-11 17:42:52 -0400387 if c.state == stateConnected {
388 c.state = stateDisconnected
389 }
390 if c.state != stateConnecting {
391 c.state = stateConnecting
392 go func() {
393 if err := c.connectToEndpoint(ctx, handler, p); err != nil {
394 c.stateLock.Lock()
395 c.state = stateDisconnected
396 c.stateLock.Unlock()
397 logger.Errorw(ctx, "connection-failed", log.Fields{"endpoint": c.apiEndPoint, "attempt": attempt, "error": err})
398
399 // Retry connection after a delay
400 if err = backoff.Backoff(ctx); err != nil {
401 // Context has closed or reached maximum elapsed time, if set
402 logger.Errorw(ctx, "retry-aborted", log.Fields{"endpoint": c.apiEndPoint, "error": err})
403 return
404 }
405 attempt += 1
406 c.events <- eventConnecting
407 } else {
408 backoff.Reset()
409 }
410 }()
411 }
412 c.stateLock.Unlock()
413
414 case eventConnected:
khenaidoo26721882021-08-11 17:42:52 -0400415 attempt = 1
416 c.stateLock.Lock()
khenaidooaa290962021-10-22 18:14:33 -0400417 logger.Debugw(ctx, "endpoint-connected", log.Fields{"endpoint": c.apiEndPoint, "curr-state": c.state})
khenaidoo26721882021-08-11 17:42:52 -0400418 if c.state != stateConnected {
419 c.state = stateConnected
420 if initialConnection {
421 logger.Debugw(ctx, "initial-endpoint-connection", log.Fields{"endpoint": c.apiEndPoint})
422 initialConnection = false
423 } else {
424 logger.Debugw(ctx, "endpoint-reconnection", log.Fields{"endpoint": c.apiEndPoint})
425 // Trigger any callback on a restart
426 go func() {
427 err := c.onRestart(log.WithSpanFromContext(context.Background(), ctx), c.apiEndPoint)
428 if err != nil {
429 logger.Errorw(ctx, "unable-to-restart-endpoint", log.Fields{"error": err, "endpoint": c.apiEndPoint})
430 }
431 }()
432 }
433 }
434 c.stateLock.Unlock()
435
436 case eventDisconnected:
437 if p != nil {
438 p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusNotReady)
439 }
khenaidooaa290962021-10-22 18:14:33 -0400440 c.stateLock.RLock()
441 logger.Debugw(ctx, "endpoint-disconnected", log.Fields{"endpoint": c.apiEndPoint, "curr-state": c.state})
442 c.stateLock.RUnlock()
khenaidoo26721882021-08-11 17:42:52 -0400443
444 // Try to connect again
445 c.events <- eventConnecting
446
447 case eventStopped:
448 logger.Debugw(ctx, "endPoint-stopped", log.Fields{"adapter": c.apiEndPoint})
449 go func() {
450 if err := c.closeConnection(ctx, p); err != nil {
451 logger.Errorw(ctx, "endpoint-closing-connection-failed", log.Fields{"endpoint": c.apiEndPoint, "error": err})
452 }
453 }()
454 break loop
455 case eventError:
456 logger.Errorw(ctx, "endpoint-error-event", log.Fields{"endpoint": c.apiEndPoint})
457 default:
458 logger.Errorw(ctx, "endpoint-unknown-event", log.Fields{"endpoint": c.apiEndPoint, "error": event})
459 }
460 }
461 }
462 logger.Infow(ctx, "endpoint-stopped", log.Fields{"endpoint": c.apiEndPoint})
463}
464
465func (c *Client) connectToEndpoint(ctx context.Context, handler SetAndTestServiceHandler, p *probe.Probe) error {
466 if p != nil {
467 p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusPreparing)
468 }
469
470 c.connectionLock.Lock()
471 defer c.connectionLock.Unlock()
472
473 if c.connection != nil {
474 _ = c.connection.Close()
475 c.connection = nil
476 }
477
478 c.service = nil
479
480 // Use Interceptors to:
481 // 1. automatically inject
482 // 2. publish Open Tracing Spans by this GRPC Client
483 // 3. detect connection failure on client calls such that the reconnection process can begin
484 conn, err := grpc.Dial(c.apiEndPoint,
485 grpc.WithInsecure(),
486 grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
487 grpc_opentracing.StreamClientInterceptor(grpc_opentracing.WithTracer(log.ActiveTracerProxy{})),
488 )),
489 grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
490 grpc_opentracing.UnaryClientInterceptor(grpc_opentracing.WithTracer(log.ActiveTracerProxy{})),
491 )),
492 grpc.WithUnaryInterceptor(c.clientInterceptor),
493 // Set keealive parameter - use default grpc values
494 grpc.WithKeepaliveParams(keepalive.ClientParameters{
495 Time: c.monitorInterval,
496 Timeout: c.backoffMaxInterval,
497 PermitWithoutStream: true,
498 }),
499 )
500
501 if err == nil {
502 subCtx, cancel := context.WithTimeout(ctx, c.backoffMaxInterval)
503 defer cancel()
504 svc := handler(subCtx, conn)
505 if svc != nil {
506 c.connection = conn
507 c.service = svc
508 if p != nil {
509 p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusRunning)
510 }
511 logger.Infow(ctx, "connected-to-endpoint", log.Fields{"endpoint": c.apiEndPoint})
512 c.events <- eventConnected
513 return nil
514 }
515 }
516 logger.Warnw(ctx, "Failed to connect to endpoint",
517 log.Fields{
518 "endpoint": c.apiEndPoint,
519 "error": err,
520 })
521
522 if p != nil {
523 p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusFailed)
524 }
525 return fmt.Errorf("no connection to endpoint %s", c.apiEndPoint)
526}
527
528func (c *Client) closeConnection(ctx context.Context, p *probe.Probe) error {
529 if p != nil {
530 p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusStopped)
531 }
532
533 c.connectionLock.Lock()
534 defer c.connectionLock.Unlock()
535
536 if c.connection != nil {
537 err := c.connection.Close()
538 c.connection = nil
539 return err
540 }
541
542 return nil
543}
544
545func (c *Client) Stop(ctx context.Context) {
546 if !c.done {
547 c.events <- eventStopped
548 close(c.events)
549 c.done = true
550 }
551}
552
553// SetService is used for testing only
554func (c *Client) SetService(srv interface{}) {
555 c.connectionLock.Lock()
556 defer c.connectionLock.Unlock()
557 c.service = srv
558}
559
560func (c *Client) SubscribeForLiveness(callback func(timestamp time.Time)) {
561 c.livenessCallback = callback
562}