blob: a0209f25e9cbcdf37b2208896c3126fd6fcb105a [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14 */
15
16package vpagent
17
18import (
19 "context"
20 "io"
21
Tinoj Joseph1d108322022-07-13 10:07:39 +053022 "voltha-go-controller/log"
vinokuma926cb3e2023-03-29 11:41:06 +053023
24 "github.com/golang/protobuf/ptypes/empty"
Naveen Sampath04696f72022-06-13 15:19:14 +053025 "google.golang.org/grpc"
26)
27
28func (vpa *VPAgent) receivePacketsIn(ctx context.Context) {
29 logger.Debug(ctx, "receive-packets-in-started")
30 // If we exit, assume disconnected
31 defer func() {
32 vpa.events <- vpaEventVolthaDisconnected
33 logger.Debug(ctx, "receive-packets-in-finished")
34 }()
35 if vpa.volthaClient == nil {
36 logger.Error(ctx, "no-voltha-connection")
37 return
38 }
39 opt := grpc.EmptyCallOption{}
40 streamCtx, streamDone := context.WithCancel(context.Background())
41 defer streamDone()
42 stream, err := vpa.volthaClient.Get().ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
43 if err != nil {
44 logger.Errorw(ctx, "Unable to establish Receive PacketIn Stream",
45 log.Fields{"error": err})
46 return
47 }
48
49top:
50
51 for {
52 select {
53 case <-ctx.Done():
54 logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
55 break top
56 default:
57 pkt, err := stream.Recv()
58 if err == io.EOF {
59 logger.Infow(ctx, "EOF for receivePacketsIn stream, reconnecting", log.Fields{"err": err})
60 stream, err = vpa.volthaClient.Get().ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
61 if err != nil {
62 logger.Errorw(ctx, "Unable to establish Receive PacketIn Stream",
63 log.Fields{"error": err})
64 return
65 }
66 continue
67 }
68
69 if isConnCanceled(err) {
70 logger.Errorw(ctx, "error receiving packet",
71 log.Fields{"error": err})
72 break top
73 } else if err != nil {
74 logger.Infow(ctx, "Ignoring unhandled error", log.Fields{"err": err})
75 continue
76 }
77 vpa.packetInChannel <- pkt
78 }
79 }
80}
81
82func (vpa *VPAgent) handlePacketsIn(ctx context.Context) {
83 logger.Debug(ctx, "handle-packets-in-started")
84top:
85 for {
86 select {
87 case <-ctx.Done():
88 logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
89 break top
90 case packet := <-vpa.packetInChannel:
91 if vpc := vpa.getVPClient(packet.Id); vpc != nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +053092 vpc.PacketIn(ctx, packet)
Naveen Sampath04696f72022-06-13 15:19:14 +053093 }
94 }
95 }
96 logger.Debug(ctx, "handle-packets-in-finished")
97}