blob: 4a2f7c3e3560fe96ae6b6323d38a899524014d8f [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14 */
15
16package vpagent
17
18import (
19 "context"
20 "io"
21
22 "github.com/golang/protobuf/ptypes/empty"
23 "github.com/opencord/voltha-lib-go/v7/pkg/log"
24 "google.golang.org/grpc"
25)
26
27func (vpa *VPAgent) receivePacketsIn(ctx context.Context) {
28 logger.Debug(ctx, "receive-packets-in-started")
29 // If we exit, assume disconnected
30 defer func() {
31 vpa.events <- vpaEventVolthaDisconnected
32 logger.Debug(ctx, "receive-packets-in-finished")
33 }()
34 if vpa.volthaClient == nil {
35 logger.Error(ctx, "no-voltha-connection")
36 return
37 }
38 opt := grpc.EmptyCallOption{}
39 streamCtx, streamDone := context.WithCancel(context.Background())
40 defer streamDone()
41 stream, err := vpa.volthaClient.Get().ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
42 if err != nil {
43 logger.Errorw(ctx, "Unable to establish Receive PacketIn Stream",
44 log.Fields{"error": err})
45 return
46 }
47
48top:
49
50 for {
51 select {
52 case <-ctx.Done():
53 logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
54 break top
55 default:
56 pkt, err := stream.Recv()
57 if err == io.EOF {
58 logger.Infow(ctx, "EOF for receivePacketsIn stream, reconnecting", log.Fields{"err": err})
59 stream, err = vpa.volthaClient.Get().ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
60 if err != nil {
61 logger.Errorw(ctx, "Unable to establish Receive PacketIn Stream",
62 log.Fields{"error": err})
63 return
64 }
65 continue
66 }
67
68 if isConnCanceled(err) {
69 logger.Errorw(ctx, "error receiving packet",
70 log.Fields{"error": err})
71 break top
72 } else if err != nil {
73 logger.Infow(ctx, "Ignoring unhandled error", log.Fields{"err": err})
74 continue
75 }
76 vpa.packetInChannel <- pkt
77 }
78 }
79}
80
81func (vpa *VPAgent) handlePacketsIn(ctx context.Context) {
82 logger.Debug(ctx, "handle-packets-in-started")
83top:
84 for {
85 select {
86 case <-ctx.Done():
87 logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
88 break top
89 case packet := <-vpa.packetInChannel:
90 if vpc := vpa.getVPClient(packet.Id); vpc != nil {
91 vpc.PacketIn(packet)
92 }
93 }
94 }
95 logger.Debug(ctx, "handle-packets-in-finished")
96}