blob: 8fc15aa649fb2f0173d324f98ba4a7a118b3b4a0 [file] [log] [blame]
Kent Hagerman03b58992019-08-29 17:21:03 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouter
18
19import (
20 "context"
21 "encoding/hex"
22 "errors"
23 "github.com/opencord/voltha-go/common/log"
24 "google.golang.org/grpc"
Kent Hagermana61567e2019-09-18 16:42:59 -040025 "google.golang.org/grpc/codes"
26 "google.golang.org/grpc/status"
Kent Hagerman03b58992019-08-29 17:21:03 -040027 "io"
28 "sync"
29)
30
31type request struct {
32 mutex sync.Mutex
33 activeResponseStreamOnce sync.Once
34 setResponseHeaderOnce sync.Once
35 responseStreamMutex sync.Mutex
36
37 streams map[string]grpc.ClientStream
38
39 requestFrameBacklog [][]byte
40 responseErrChan chan error
41 sendClosed bool
42
43 backend *backend
44 ctx context.Context
45 serverStream grpc.ServerStream
46 methodInfo methodDetails
47 requestFrame *requestFrame
48 responseFrame *responseFrame
49 isStreamingRequest bool
50 isStreamingResponse bool
51}
52
Kent Hagermana61567e2019-09-18 16:42:59 -040053var transactionNotAcquiredErrorString = status.Error(codes.Canceled, "transaction-not-acquired").Error()
54
Kent Hagerman03b58992019-08-29 17:21:03 -040055// catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked
56func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) {
57 r.streams[connName] = stream
58
59 // prime new streams with any traffic they might have missed (non-streaming requests only)
60 frame := *r.requestFrame // local copy of frame
61 for _, payload := range r.requestFrameBacklog {
62 frame.payload = payload
63 if err := stream.SendMsg(&frame); err != nil {
64 log.Debugf("Error on SendMsg: %s", err.Error())
65 break
66 }
67 }
68 if r.sendClosed {
69 stream.CloseSend()
70 }
71
72 r.mutex.Unlock()
73
74 r.forwardResponseStream(connName, stream)
75}
76
77// forwardResponseStream forwards the response stream
78func (r *request) forwardResponseStream(connName string, stream grpc.ClientStream) {
79 var queuedFrames [][]byte
80 frame := *r.responseFrame
81 var err error
82 activeStream := false
83 for {
84 err = stream.RecvMsg(&frame)
Kent Hagermana61567e2019-09-18 16:42:59 -040085 // if this is an inactive responder, ignore everything it sends
86 if err != nil && err.Error() == transactionNotAcquiredErrorString {
87 break
88 }
Kent Hagerman03b58992019-08-29 17:21:03 -040089 // the first thread to reach this point (first to receive a response frame) will become the active stream
90 r.activeResponseStreamOnce.Do(func() { activeStream = true })
91 if err != nil {
92 // this can be io.EOF which is the success case
93 break
94 }
95
96 if r.isStreamingResponse {
97 // streaming response - send immediately
98 if err = r.sendResponseFrame(stream, frame); err != nil {
99 break
100 }
101 } else { // !r.isStreamingResponse
102
103 if r.isStreamingRequest { // && !r.isStreamingResponse
104 // queue the frame (only send response when the last stream closes)
105 queuedFrames = append(queuedFrames, frame.payload)
106 } else { // !r.isStreamingRequest && !r.isStreamingResponse
107
108 // only the active stream will respond
109 if activeStream { // && !r.isStreamingRequest && !r.isStreamingResponse
110 // send the response immediately
111 if err = r.sendResponseFrame(stream, frame); err != nil {
112 break
113 }
114 } else { // !activeStream && !r.isStreamingRequest && !r.isStreamingResponse
115 // just read & discard until the stream dies
116 }
117 }
118 }
119 }
120
121 log.Debugf("Closing stream to %s", connName)
122
123 // io.EOF is the success case
124 if err == io.EOF {
125 err = nil
126 }
127
128 // this double-lock sets off alarm bells in my head
129 r.backend.mutex.Lock()
130 r.mutex.Lock()
131 delete(r.streams, connName)
132 streamsLeft := len(r.streams)
133
Kent Hagermana61567e2019-09-18 16:42:59 -0400134 // handle the case where no cores are the active responder. Should never happen, but just in case...
135 if streamsLeft == 0 {
136 r.activeResponseStreamOnce.Do(func() { activeStream = true })
137 }
138
Kent Hagerman03b58992019-08-29 17:21:03 -0400139 // if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests)
140 if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) {
141 // request is complete, cleanup
142 delete(r.backend.activeRequests, r)
143 r.mutex.Unlock()
144 r.backend.mutex.Unlock()
145
146 // send any queued frames we have (streaming request & !streaming response only, but no harm trying in other cases)
147 for _, payload := range queuedFrames {
148 if err != nil {
149 // if there's been an error, don't try to send anymore
150 break
151 }
152 frame.payload = payload
153 err = r.sendResponseFrame(stream, frame)
154 }
155
156 // We may have received Trailers as part of the call.
157 r.serverStream.SetTrailer(stream.Trailer())
158
159 // response stream complete
160 r.responseErrChan <- err
161 } else {
162 r.mutex.Unlock()
163 r.backend.mutex.Unlock()
164 }
165}
166
167func (r *request) sendResponseFrame(stream grpc.ClientStream, f responseFrame) error {
168 r.responseStreamMutex.Lock()
169 defer r.responseStreamMutex.Unlock()
170
171 // the header should only be set once, even if multiple streams can respond.
172 setHeader := false
173 r.setResponseHeaderOnce.Do(func() { setHeader = true })
174 if setHeader {
175 // This is a bit of a hack, but client to server headers are only readable after first client msg is
176 // received but must be written to server stream before the first msg is flushed.
177 // This is the only place to do it nicely.
178 md, err := stream.Header()
179 if err != nil {
180 return err
181 }
182 // Update the metadata for the response.
183 if f.metaKey != NoMeta {
184 if f.metaVal == "" {
185 // We could also always just do this
186 md.Set(f.metaKey, f.backend.name)
187 } else {
188 md.Set(f.metaKey, f.metaVal)
189 }
190 }
191 if err := r.serverStream.SendHeader(md); err != nil {
192 return err
193 }
194 }
195
196 log.Debugf("Response frame %s", hex.EncodeToString(f.payload))
197
198 return r.serverStream.SendMsg(&f)
199}
200
201func (r *request) sendAll(frame *requestFrame) error {
202 r.mutex.Lock()
203 if !r.isStreamingRequest {
204 // save frames of non-streaming requests, so we can catchup new streams
205 r.requestFrameBacklog = append(r.requestFrameBacklog, frame.payload)
206 }
207
208 // send to all existing streams
209 streams := make(map[string]grpc.ClientStream, len(r.streams))
210 for n, s := range r.streams {
211 streams[n] = s
212 }
213 r.mutex.Unlock()
214
215 var rtrn error
216 atLeastOne := false
217 atLeastOneSuccess := false
218 for _, stream := range streams {
219 if err := stream.SendMsg(frame); err != nil {
220 log.Debugf("Error on SendMsg: %s", err.Error())
221 rtrn = err
222 } else {
223 atLeastOneSuccess = true
224 }
225 atLeastOne = true
226 }
227 // If one of the streams succeeded, declare success
228 // if none did pick an error and return it.
229 if atLeastOne {
230 if atLeastOneSuccess {
231 return nil
232 } else {
233 return rtrn
234 }
235 } else {
236 err := errors.New("unable to send, all streams have closed")
237 log.Error(err)
238 return err
239 }
240}
241
242func (r *request) forwardRequestStream(src grpc.ServerStream) error {
243 // The frame buffer already has the results of a first
244 // RecvMsg in it so the first thing to do is to
245 // send it to the list of client streams and only
246 // then read some more.
247 frame := *r.requestFrame // local copy of frame
248 var rtrn error
249 for {
250 // Send the message to each of the backend streams
251 if err := r.sendAll(&frame); err != nil {
252 log.Debugf("SendAll failed %s", err.Error())
253 rtrn = err
254 break
255 }
256 log.Debugf("Request frame %s", hex.EncodeToString(frame.payload))
257 if err := src.RecvMsg(&frame); err != nil {
258 rtrn = err // this can be io.EOF which is happy case
259 break
260 }
261 }
262
263 r.mutex.Lock()
264 log.Debug("Closing southbound streams")
265 r.sendClosed = true
266 for _, stream := range r.streams {
267 stream.CloseSend()
268 }
269 r.mutex.Unlock()
270
271 if rtrn != io.EOF {
272 log.Debugf("s2cErr reporting %v", rtrn)
273 return rtrn
274 }
275 log.Debug("s2cErr reporting EOF")
276 // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore.
277 // the clientStream>serverStream may continue sending though.
278 return nil
279}