blob: 3e536b9277fab10fe301eade608b9e5c5dbdecee [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14 */
15
16package vpagent
17
18import (
19 "context"
20 "io"
21
Tinoj Joseph1d108322022-07-13 10:07:39 +053022 "voltha-go-controller/log"
Akash Soni53da2852023-03-15 00:31:31 +053023
24 "github.com/golang/protobuf/ptypes/empty"
Naveen Sampath04696f72022-06-13 15:19:14 +053025 "google.golang.org/grpc"
26)
27
28func (vpa *VPAgent) receiveChangeEvents(ctx context.Context) {
29 logger.Debug(ctx, "receive-change-events-started")
30 // If we exit, assume disconnected
31 defer func() {
32 vpa.events <- vpaEventVolthaDisconnected
33 logger.Debug(ctx, "receive-change-events-finished")
34 }()
35 if vpa.volthaClient == nil {
36 logger.Error(ctx, "no-voltha-connection")
37 return
38 }
39 opt := grpc.EmptyCallOption{}
40 streamCtx, streamDone := context.WithCancel(context.Background())
41 defer streamDone()
42 vServiceClient := vpa.volthaClient.Get()
43 if vServiceClient == nil {
44 logger.Error(ctx, "Failed to get Voltha Service Client")
45 return
46 }
47
48 stream, err := vServiceClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
49 if err != nil {
50 logger.Errorw(ctx, "Unable to establish Receive Change Event Stream",
51 log.Fields{"error": err})
52 return
53 }
54
55top:
56 for {
57 select {
58 case <-ctx.Done():
59 logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
60 break top
61 default:
62 ce, err := stream.Recv()
63 if err == io.EOF {
Akash Soni53da2852023-03-15 00:31:31 +053064 logger.Warnw(ctx, "EOF for receiveChangeEvents stream, reconnecting", log.Fields{"err": err})
Naveen Sampath04696f72022-06-13 15:19:14 +053065 stream, err = vServiceClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
66 if err != nil {
67 logger.Errorw(ctx, "Unable to establish Receive Change Event Stream",
68 log.Fields{"error": err})
69 return
70 }
71 continue
72 }
73 if isConnCanceled(err) {
74 logger.Errorw(ctx, "error receiving change event",
75 log.Fields{"error": err})
76 break top
77 } else if err != nil {
78 logger.Infow(ctx, "Ignoring unhandled error", log.Fields{"err": err})
79 continue
80 }
81 vpa.changeEventChannel <- ce
82 logger.Debug(ctx, "receive-change-event-queued")
83 }
84 }
85}
86
87func (vpa *VPAgent) handleChangeEvents(ctx context.Context) {
88 logger.Debug(ctx, "handle-change-event-started")
89
90top:
91 for {
92 select {
93 case <-ctx.Done():
94 logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
95 break top
96 case changeEvent := <-vpa.changeEventChannel:
97 logger.Debugw(ctx, "Change Event", log.Fields{"Device": changeEvent.Id})
98 if vpc := vpa.getVPClient(changeEvent.Id); vpc != nil {
Akash Soni53da2852023-03-15 00:31:31 +053099 if err := vpc.ChangeEvent(changeEvent); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530100 logger.Errorw(ctx, "error handling Change Event", log.Fields{"Error": err, "Device": changeEvent.Id})
101 }
102 }
103 }
104 }
105
106 logger.Debug(ctx, "handle-change-event-finsihed")
107}