blob: 458ed06b638706501e27f46293d7f50b47963d5b [file] [log] [blame]
Kent Hagerman1e9061e2019-05-21 16:01:21 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouter
18
19import (
20 "context"
21 "errors"
22 "github.com/opencord/voltha-go/common/log"
23 "google.golang.org/grpc"
24 "google.golang.org/grpc/metadata"
25 "sort"
26 "sync"
27)
28
29type streams struct {
30 mutex sync.Mutex
31 activeStream *stream
32 streams map[string]*stream
33 sortedStreams []*stream
34}
35
36type stream struct {
37 stream grpc.ClientStream
38 ctx context.Context
39 cancel context.CancelFunc
40 ok2Close chan struct{}
41 c2sReturn chan error
42 s2cReturn error
43}
44
45func (s *streams) clientCancel() {
46 for _, strm := range s.streams {
47 if strm != nil {
48 strm.cancel()
49 }
50 }
51}
52
53func (s *streams) closeSend() {
54 for _, strm := range s.streams {
55 if strm != nil {
56 <-strm.ok2Close
57 log.Debug("Closing southbound stream")
58 strm.stream.CloseSend()
59 }
60 }
61}
62
63func (s *streams) trailer() metadata.MD {
64 return s.activeStream.stream.Trailer()
65}
66
67func (s *streams) getActive() *stream {
68 s.mutex.Lock()
69 defer s.mutex.Unlock()
70 return s.activeStream
71}
72
73func (s *streams) setThenGetActive(strm *stream) *stream {
74 s.mutex.Lock()
75 defer s.mutex.Unlock()
76 if s.activeStream == nil {
77 s.activeStream = strm
78 }
79 return s.activeStream
80}
81
82func (s *streams) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error {
83 fc2s := func(srcS *stream) {
84 for i := 0; ; i++ {
85 if err := srcS.stream.RecvMsg(f); err != nil {
86 if s.setThenGetActive(srcS) == srcS {
87 srcS.c2sReturn <- err // this can be io.EOF which is the success case
88 } else {
89 srcS.c2sReturn <- nil // Inactive responder
90 }
91 close(srcS.ok2Close)
92 break
93 }
94 if s.setThenGetActive(srcS) != srcS {
95 srcS.c2sReturn <- nil
96 continue
97 }
98 if i == 0 {
99 // This is a bit of a hack, but client to server headers are only readable after first client msg is
100 // received but must be written to server stream before the first msg is flushed.
101 // This is the only place to do it nicely.
102 md, err := srcS.stream.Header()
103 if err != nil {
104 srcS.c2sReturn <- err
105 break
106 }
107 // Update the metadata for the response.
108 if f.metaKey != NoMeta {
109 if f.metaVal == "" {
110 // We could also alsways just do this
111 md.Set(f.metaKey, f.backend.name)
112 } else {
113 md.Set(f.metaKey, f.metaVal)
114 }
115 }
116 if err := dst.SendHeader(md); err != nil {
117 srcS.c2sReturn <- err
118 break
119 }
120 }
121 log.Debugf("Northbound frame %v", f.payload)
122 if err := dst.SendMsg(f); err != nil {
123 srcS.c2sReturn <- err
124 break
125 }
126 }
127 }
128
129 // There should be AT LEAST one open stream at this point
130 // if there isn't its a grave error in the code and it will
131 // cause this thread to block here so check for it and
132 // don't let the lock up happen but report the error
133 ret := make(chan error, 1)
134 agg := make(chan *stream)
135 atLeastOne := false
136 for _, strm := range s.streams {
137 if strm != nil {
138 go fc2s(strm)
139 go func(s *stream) { // Wait on result and aggregate
140 r := <-s.c2sReturn // got the return code
141 if r == nil {
142 return // We're the redundant stream, just die
143 }
144 s.c2sReturn <- r // put it back to pass it along
145 agg <- s // send the stream to the aggregator
146 }(strm)
147 atLeastOne = true
148 }
149 }
150 if atLeastOne == true {
151 go func() { // Wait on aggregated result
152 s := <-agg
153 ret <- <-s.c2sReturn
154 }()
155 } else {
156 err := errors.New("There are no open streams. Unable to forward message.")
157 log.Error(err)
158 ret <- err
159 }
160 return ret
161}
162
163func (s *streams) sendAll(f *nbFrame) error {
164 var rtrn error
165
166 atLeastOne := false
167 for _, strm := range s.sortedStreams {
168 if strm != nil {
169 if err := strm.stream.SendMsg(f); err != nil {
170 log.Debugf("Error on SendMsg: %s", err.Error())
171 strm.s2cReturn = err
172 }
173 atLeastOne = true
174 } else {
175 log.Debugf("Nil stream")
176 }
177 }
178 // If one of the streams succeeded, declare success
179 // if none did pick an error and return it.
180 if atLeastOne == true {
181 for _, strm := range s.sortedStreams {
182 if strm != nil {
183 rtrn = strm.s2cReturn
184 if rtrn == nil {
185 return rtrn
186 }
187 }
188 }
189 return rtrn
190 } else {
191 rtrn = errors.New("There are no open streams, this should never happen")
192 log.Error(rtrn)
193 }
194 return rtrn
195}
196
197func (s *streams) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
198 ret := make(chan error, 1)
199 go func() {
200 // The frame buffer already has the results of a first
201 // RecvMsg in it so the first thing to do is to
202 // send it to the list of client streams and only
203 // then read some more.
204 for i := 0; ; i++ {
205 // Send the message to each of the backend streams
206 if err := s.sendAll(f); err != nil {
207 ret <- err
208 log.Debugf("SendAll failed %s", err.Error())
209 break
210 }
211 log.Debugf("Southbound frame %v", f.payload)
212 if err := src.RecvMsg(f); err != nil {
213 ret <- err // this can be io.EOF which is happy case
214 break
215 }
216 }
217 }()
218 return ret
219}
220
221func (s *streams) sortStreams() {
222 var tmpKeys []string
223 for k := range s.streams {
224 tmpKeys = append(tmpKeys, k)
225 }
226 sort.Strings(tmpKeys)
227 for _, v := range tmpKeys {
228 s.sortedStreams = append(s.sortedStreams, s.streams[v])
229 }
230}