blob: dbd9968f0653c4b138109c6fc72b7a850d27318f [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouter
18
19import (
20 "context"
21 "encoding/hex"
22 "errors"
23 "github.com/opencord/voltha-go/common/log"
24 "google.golang.org/grpc"
25 "google.golang.org/grpc/codes"
26 "google.golang.org/grpc/status"
27 "io"
28 "sync"
29)
30
31type request struct {
32 mutex sync.Mutex
33 activeResponseStreamOnce sync.Once
34 setResponseHeaderOnce sync.Once
35 responseStreamMutex sync.Mutex
36
37 streams map[string]grpc.ClientStream
38
39 requestFrameBacklog [][]byte
40 responseErrChan chan error
41 sendClosed bool
42
43 backend *backend
44 ctx context.Context
45 serverStream grpc.ServerStream
46 methodInfo methodDetails
47 requestFrame *requestFrame
48 responseFrame *responseFrame
49 isStreamingRequest bool
50 isStreamingResponse bool
51}
52
53var transactionNotAcquiredErrorString = status.Error(codes.Canceled, "transaction-not-acquired").Error()
54
55// catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked
56func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) {
57 r.streams[connName] = stream
58
59 // prime new streams with any traffic they might have missed (non-streaming requests only)
60 frame := *r.requestFrame // local copy of frame
61 for _, payload := range r.requestFrameBacklog {
62 frame.payload = payload
63 if err := stream.SendMsg(&frame); err != nil {
64 log.Debugf("Error on SendMsg: %s", err.Error())
65 break
66 }
67 }
68 if r.sendClosed {
Scott Baker4989fe92019-10-09 17:03:06 -070069 err := stream.CloseSend()
70 if err != nil {
71 log.Errorf("%v", err)
72 }
Scott Bakere7144bc2019-10-01 14:16:47 -070073 }
74
75 r.mutex.Unlock()
76
77 r.forwardResponseStream(connName, stream)
78}
79
80// forwardResponseStream forwards the response stream
81func (r *request) forwardResponseStream(connName string, stream grpc.ClientStream) {
82 var queuedFrames [][]byte
83 frame := *r.responseFrame
84 var err error
85 activeStream := false
86 for {
87 err = stream.RecvMsg(&frame)
88 // if this is an inactive responder, ignore everything it sends
89 if err != nil && err.Error() == transactionNotAcquiredErrorString {
90 break
91 }
92 // the first thread to reach this point (first to receive a response frame) will become the active stream
93 r.activeResponseStreamOnce.Do(func() { activeStream = true })
94 if err != nil {
95 // this can be io.EOF which is the success case
96 break
97 }
98
99 if r.isStreamingResponse {
100 // streaming response - send immediately
101 if err = r.sendResponseFrame(stream, frame); err != nil {
102 break
103 }
104 } else { // !r.isStreamingResponse
105
106 if r.isStreamingRequest { // && !r.isStreamingResponse
107 // queue the frame (only send response when the last stream closes)
108 queuedFrames = append(queuedFrames, frame.payload)
109 } else { // !r.isStreamingRequest && !r.isStreamingResponse
110
111 // only the active stream will respond
112 if activeStream { // && !r.isStreamingRequest && !r.isStreamingResponse
113 // send the response immediately
114 if err = r.sendResponseFrame(stream, frame); err != nil {
115 break
116 }
Scott Bakere7144bc2019-10-01 14:16:47 -0700117 }
Scott Baker4989fe92019-10-09 17:03:06 -0700118 // !activeStream && !r.isStreamingRequest && !r.isStreamingResponse
119 // just read & discard until the stream dies
Scott Bakere7144bc2019-10-01 14:16:47 -0700120 }
121 }
122 }
123
124 log.Debugf("Closing stream to %s", connName)
125
126 // io.EOF is the success case
127 if err == io.EOF {
128 err = nil
129 }
130
131 // this double-lock sets off alarm bells in my head
132 r.backend.mutex.Lock()
133 r.mutex.Lock()
134 delete(r.streams, connName)
135 streamsLeft := len(r.streams)
136
137 // handle the case where no cores are the active responder. Should never happen, but just in case...
138 if streamsLeft == 0 {
139 r.activeResponseStreamOnce.Do(func() { activeStream = true })
140 }
141
142 // if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests)
143 if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) {
144 // request is complete, cleanup
145 delete(r.backend.activeRequests, r)
146 r.mutex.Unlock()
147 r.backend.mutex.Unlock()
148
149 // send any queued frames we have (streaming request & !streaming response only, but no harm trying in other cases)
150 for _, payload := range queuedFrames {
151 if err != nil {
152 // if there's been an error, don't try to send anymore
153 break
154 }
155 frame.payload = payload
156 err = r.sendResponseFrame(stream, frame)
157 }
158
159 // We may have received Trailers as part of the call.
160 r.serverStream.SetTrailer(stream.Trailer())
161
162 // response stream complete
163 r.responseErrChan <- err
164 } else {
165 r.mutex.Unlock()
166 r.backend.mutex.Unlock()
167 }
168}
169
170func (r *request) sendResponseFrame(stream grpc.ClientStream, f responseFrame) error {
171 r.responseStreamMutex.Lock()
172 defer r.responseStreamMutex.Unlock()
173
174 // the header should only be set once, even if multiple streams can respond.
175 setHeader := false
176 r.setResponseHeaderOnce.Do(func() { setHeader = true })
177 if setHeader {
178 // This is a bit of a hack, but client to server headers are only readable after first client msg is
179 // received but must be written to server stream before the first msg is flushed.
180 // This is the only place to do it nicely.
181 md, err := stream.Header()
182 if err != nil {
183 return err
184 }
185 // Update the metadata for the response.
186 if f.metaKey != NoMeta {
187 if f.metaVal == "" {
188 // We could also always just do this
189 md.Set(f.metaKey, f.backend.name)
190 } else {
191 md.Set(f.metaKey, f.metaVal)
192 }
193 }
194 if err := r.serverStream.SendHeader(md); err != nil {
195 return err
196 }
197 }
198
199 log.Debugf("Response frame %s", hex.EncodeToString(f.payload))
200
201 return r.serverStream.SendMsg(&f)
202}
203
204func (r *request) sendAll(frame *requestFrame) error {
205 r.mutex.Lock()
206 if !r.isStreamingRequest {
207 // save frames of non-streaming requests, so we can catchup new streams
208 r.requestFrameBacklog = append(r.requestFrameBacklog, frame.payload)
209 }
210
211 // send to all existing streams
212 streams := make(map[string]grpc.ClientStream, len(r.streams))
213 for n, s := range r.streams {
214 streams[n] = s
215 }
216 r.mutex.Unlock()
217
218 var rtrn error
219 atLeastOne := false
220 atLeastOneSuccess := false
221 for _, stream := range streams {
222 if err := stream.SendMsg(frame); err != nil {
223 log.Debugf("Error on SendMsg: %s", err.Error())
224 rtrn = err
225 } else {
226 atLeastOneSuccess = true
227 }
228 atLeastOne = true
229 }
230 // If one of the streams succeeded, declare success
231 // if none did pick an error and return it.
232 if atLeastOne {
233 if atLeastOneSuccess {
234 return nil
235 } else {
236 return rtrn
237 }
238 } else {
239 err := errors.New("unable to send, all streams have closed")
240 log.Error(err)
241 return err
242 }
243}
244
245func (r *request) forwardRequestStream(src grpc.ServerStream) error {
246 // The frame buffer already has the results of a first
247 // RecvMsg in it so the first thing to do is to
248 // send it to the list of client streams and only
249 // then read some more.
250 frame := *r.requestFrame // local copy of frame
251 var rtrn error
252 for {
253 // Send the message to each of the backend streams
254 if err := r.sendAll(&frame); err != nil {
255 log.Debugf("SendAll failed %s", err.Error())
256 rtrn = err
257 break
258 }
259 log.Debugf("Request frame %s", hex.EncodeToString(frame.payload))
260 if err := src.RecvMsg(&frame); err != nil {
261 rtrn = err // this can be io.EOF which is happy case
262 break
263 }
264 }
265
266 r.mutex.Lock()
267 log.Debug("Closing southbound streams")
268 r.sendClosed = true
269 for _, stream := range r.streams {
Scott Baker4989fe92019-10-09 17:03:06 -0700270 err := stream.CloseSend()
271 if err != nil {
272 log.Errorf("%v", err)
273 }
Scott Bakere7144bc2019-10-01 14:16:47 -0700274 }
275 r.mutex.Unlock()
276
277 if rtrn != io.EOF {
278 log.Debugf("s2cErr reporting %v", rtrn)
279 return rtrn
280 }
281 log.Debug("s2cErr reporting EOF")
282 // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore.
283 // the clientStream>serverStream may continue sending though.
284 return nil
285}