blob: 0af20e3995384948c83de2f3d69dc64fcb7a3934 [file] [log] [blame]
Kent Hagerman03b58992019-08-29 17:21:03 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouter
18
19import (
20 "context"
21 "encoding/hex"
22 "errors"
23 "github.com/opencord/voltha-go/common/log"
24 "google.golang.org/grpc"
25 "io"
26 "sync"
27)
28
29type request struct {
30 mutex sync.Mutex
31 activeResponseStreamOnce sync.Once
32 setResponseHeaderOnce sync.Once
33 responseStreamMutex sync.Mutex
34
35 streams map[string]grpc.ClientStream
36
37 requestFrameBacklog [][]byte
38 responseErrChan chan error
39 sendClosed bool
40
41 backend *backend
42 ctx context.Context
43 serverStream grpc.ServerStream
44 methodInfo methodDetails
45 requestFrame *requestFrame
46 responseFrame *responseFrame
47 isStreamingRequest bool
48 isStreamingResponse bool
49}
50
51// catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked
52func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) {
53 r.streams[connName] = stream
54
55 // prime new streams with any traffic they might have missed (non-streaming requests only)
56 frame := *r.requestFrame // local copy of frame
57 for _, payload := range r.requestFrameBacklog {
58 frame.payload = payload
59 if err := stream.SendMsg(&frame); err != nil {
60 log.Debugf("Error on SendMsg: %s", err.Error())
61 break
62 }
63 }
64 if r.sendClosed {
65 stream.CloseSend()
66 }
67
68 r.mutex.Unlock()
69
70 r.forwardResponseStream(connName, stream)
71}
72
73// forwardResponseStream forwards the response stream
74func (r *request) forwardResponseStream(connName string, stream grpc.ClientStream) {
75 var queuedFrames [][]byte
76 frame := *r.responseFrame
77 var err error
78 activeStream := false
79 for {
80 err = stream.RecvMsg(&frame)
81 // the first thread to reach this point (first to receive a response frame) will become the active stream
82 r.activeResponseStreamOnce.Do(func() { activeStream = true })
83 if err != nil {
84 // this can be io.EOF which is the success case
85 break
86 }
87
88 if r.isStreamingResponse {
89 // streaming response - send immediately
90 if err = r.sendResponseFrame(stream, frame); err != nil {
91 break
92 }
93 } else { // !r.isStreamingResponse
94
95 if r.isStreamingRequest { // && !r.isStreamingResponse
96 // queue the frame (only send response when the last stream closes)
97 queuedFrames = append(queuedFrames, frame.payload)
98 } else { // !r.isStreamingRequest && !r.isStreamingResponse
99
100 // only the active stream will respond
101 if activeStream { // && !r.isStreamingRequest && !r.isStreamingResponse
102 // send the response immediately
103 if err = r.sendResponseFrame(stream, frame); err != nil {
104 break
105 }
106 } else { // !activeStream && !r.isStreamingRequest && !r.isStreamingResponse
107 // just read & discard until the stream dies
108 }
109 }
110 }
111 }
112
113 log.Debugf("Closing stream to %s", connName)
114
115 // io.EOF is the success case
116 if err == io.EOF {
117 err = nil
118 }
119
120 // this double-lock sets off alarm bells in my head
121 r.backend.mutex.Lock()
122 r.mutex.Lock()
123 delete(r.streams, connName)
124 streamsLeft := len(r.streams)
125
126 // if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests)
127 if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) {
128 // request is complete, cleanup
129 delete(r.backend.activeRequests, r)
130 r.mutex.Unlock()
131 r.backend.mutex.Unlock()
132
133 // send any queued frames we have (streaming request & !streaming response only, but no harm trying in other cases)
134 for _, payload := range queuedFrames {
135 if err != nil {
136 // if there's been an error, don't try to send anymore
137 break
138 }
139 frame.payload = payload
140 err = r.sendResponseFrame(stream, frame)
141 }
142
143 // We may have received Trailers as part of the call.
144 r.serverStream.SetTrailer(stream.Trailer())
145
146 // response stream complete
147 r.responseErrChan <- err
148 } else {
149 r.mutex.Unlock()
150 r.backend.mutex.Unlock()
151 }
152}
153
154func (r *request) sendResponseFrame(stream grpc.ClientStream, f responseFrame) error {
155 r.responseStreamMutex.Lock()
156 defer r.responseStreamMutex.Unlock()
157
158 // the header should only be set once, even if multiple streams can respond.
159 setHeader := false
160 r.setResponseHeaderOnce.Do(func() { setHeader = true })
161 if setHeader {
162 // This is a bit of a hack, but client to server headers are only readable after first client msg is
163 // received but must be written to server stream before the first msg is flushed.
164 // This is the only place to do it nicely.
165 md, err := stream.Header()
166 if err != nil {
167 return err
168 }
169 // Update the metadata for the response.
170 if f.metaKey != NoMeta {
171 if f.metaVal == "" {
172 // We could also always just do this
173 md.Set(f.metaKey, f.backend.name)
174 } else {
175 md.Set(f.metaKey, f.metaVal)
176 }
177 }
178 if err := r.serverStream.SendHeader(md); err != nil {
179 return err
180 }
181 }
182
183 log.Debugf("Response frame %s", hex.EncodeToString(f.payload))
184
185 return r.serverStream.SendMsg(&f)
186}
187
188func (r *request) sendAll(frame *requestFrame) error {
189 r.mutex.Lock()
190 if !r.isStreamingRequest {
191 // save frames of non-streaming requests, so we can catchup new streams
192 r.requestFrameBacklog = append(r.requestFrameBacklog, frame.payload)
193 }
194
195 // send to all existing streams
196 streams := make(map[string]grpc.ClientStream, len(r.streams))
197 for n, s := range r.streams {
198 streams[n] = s
199 }
200 r.mutex.Unlock()
201
202 var rtrn error
203 atLeastOne := false
204 atLeastOneSuccess := false
205 for _, stream := range streams {
206 if err := stream.SendMsg(frame); err != nil {
207 log.Debugf("Error on SendMsg: %s", err.Error())
208 rtrn = err
209 } else {
210 atLeastOneSuccess = true
211 }
212 atLeastOne = true
213 }
214 // If one of the streams succeeded, declare success
215 // if none did pick an error and return it.
216 if atLeastOne {
217 if atLeastOneSuccess {
218 return nil
219 } else {
220 return rtrn
221 }
222 } else {
223 err := errors.New("unable to send, all streams have closed")
224 log.Error(err)
225 return err
226 }
227}
228
229func (r *request) forwardRequestStream(src grpc.ServerStream) error {
230 // The frame buffer already has the results of a first
231 // RecvMsg in it so the first thing to do is to
232 // send it to the list of client streams and only
233 // then read some more.
234 frame := *r.requestFrame // local copy of frame
235 var rtrn error
236 for {
237 // Send the message to each of the backend streams
238 if err := r.sendAll(&frame); err != nil {
239 log.Debugf("SendAll failed %s", err.Error())
240 rtrn = err
241 break
242 }
243 log.Debugf("Request frame %s", hex.EncodeToString(frame.payload))
244 if err := src.RecvMsg(&frame); err != nil {
245 rtrn = err // this can be io.EOF which is happy case
246 break
247 }
248 }
249
250 r.mutex.Lock()
251 log.Debug("Closing southbound streams")
252 r.sendClosed = true
253 for _, stream := range r.streams {
254 stream.CloseSend()
255 }
256 r.mutex.Unlock()
257
258 if rtrn != io.EOF {
259 log.Debugf("s2cErr reporting %v", rtrn)
260 return rtrn
261 }
262 log.Debug("s2cErr reporting EOF")
263 // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore.
264 // the clientStream>serverStream may continue sending though.
265 return nil
266}