blob: 9300671d822c1b5114090b74cedfa81303f0f0f6 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14 */
15
16package vpagent
17
18import (
19 "context"
20 "io"
21
22 "github.com/golang/protobuf/ptypes/empty"
Tinoj Joseph1d108322022-07-13 10:07:39 +053023 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053024 "google.golang.org/grpc"
25)
26
27func (vpa *VPAgent) receiveChangeEvents(ctx context.Context) {
28 logger.Debug(ctx, "receive-change-events-started")
29 // If we exit, assume disconnected
30 defer func() {
31 vpa.events <- vpaEventVolthaDisconnected
32 logger.Debug(ctx, "receive-change-events-finished")
33 }()
34 if vpa.volthaClient == nil {
35 logger.Error(ctx, "no-voltha-connection")
36 return
37 }
38 opt := grpc.EmptyCallOption{}
39 streamCtx, streamDone := context.WithCancel(context.Background())
40 defer streamDone()
41 vServiceClient := vpa.volthaClient.Get()
42 if vServiceClient == nil {
43 logger.Error(ctx, "Failed to get Voltha Service Client")
44 return
45 }
46
47 stream, err := vServiceClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
48 if err != nil {
49 logger.Errorw(ctx, "Unable to establish Receive Change Event Stream",
50 log.Fields{"error": err})
51 return
52 }
53
54top:
55 for {
56 select {
57 case <-ctx.Done():
58 logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
59 break top
60 default:
61 ce, err := stream.Recv()
62 if err == io.EOF {
63 logger.Infow(ctx, "EOF for receiveChangeEvents stream, reconnecting", log.Fields{"err": err})
64 stream, err = vServiceClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
65 if err != nil {
66 logger.Errorw(ctx, "Unable to establish Receive Change Event Stream",
67 log.Fields{"error": err})
68 return
69 }
70 continue
71 }
72 if isConnCanceled(err) {
73 logger.Errorw(ctx, "error receiving change event",
74 log.Fields{"error": err})
75 break top
76 } else if err != nil {
77 logger.Infow(ctx, "Ignoring unhandled error", log.Fields{"err": err})
78 continue
79 }
80 vpa.changeEventChannel <- ce
81 logger.Debug(ctx, "receive-change-event-queued")
82 }
83 }
84}
85
86func (vpa *VPAgent) handleChangeEvents(ctx context.Context) {
87 logger.Debug(ctx, "handle-change-event-started")
88
89top:
90 for {
91 select {
92 case <-ctx.Done():
93 logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
94 break top
95 case changeEvent := <-vpa.changeEventChannel:
96 logger.Debugw(ctx, "Change Event", log.Fields{"Device": changeEvent.Id})
97 if vpc := vpa.getVPClient(changeEvent.Id); vpc != nil {
98 if err:= vpc.ChangeEvent(changeEvent); err != nil {
99 logger.Errorw(ctx, "error handling Change Event", log.Fields{"Error": err, "Device": changeEvent.Id})
100 }
101 }
102 }
103 }
104
105 logger.Debug(ctx, "handle-change-event-finsihed")
106}