Scott Baker | e7144bc | 2019-10-01 14:16:47 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2019-present Open Networking Foundation |
| 3 | |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package afrouter |
| 18 | |
| 19 | import ( |
| 20 | "context" |
| 21 | "encoding/hex" |
| 22 | "errors" |
| 23 | "github.com/opencord/voltha-go/common/log" |
| 24 | "google.golang.org/grpc" |
| 25 | "google.golang.org/grpc/codes" |
| 26 | "google.golang.org/grpc/status" |
| 27 | "io" |
| 28 | "sync" |
| 29 | ) |
| 30 | |
| 31 | type request struct { |
| 32 | mutex sync.Mutex |
| 33 | activeResponseStreamOnce sync.Once |
| 34 | setResponseHeaderOnce sync.Once |
| 35 | responseStreamMutex sync.Mutex |
| 36 | |
| 37 | streams map[string]grpc.ClientStream |
| 38 | |
| 39 | requestFrameBacklog [][]byte |
| 40 | responseErrChan chan error |
| 41 | sendClosed bool |
| 42 | |
| 43 | backend *backend |
| 44 | ctx context.Context |
| 45 | serverStream grpc.ServerStream |
| 46 | methodInfo methodDetails |
| 47 | requestFrame *requestFrame |
| 48 | responseFrame *responseFrame |
| 49 | isStreamingRequest bool |
| 50 | isStreamingResponse bool |
| 51 | } |
| 52 | |
| 53 | var transactionNotAcquiredErrorString = status.Error(codes.Canceled, "transaction-not-acquired").Error() |
| 54 | |
| 55 | // catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked |
| 56 | func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) { |
| 57 | r.streams[connName] = stream |
| 58 | |
| 59 | // prime new streams with any traffic they might have missed (non-streaming requests only) |
| 60 | frame := *r.requestFrame // local copy of frame |
| 61 | for _, payload := range r.requestFrameBacklog { |
| 62 | frame.payload = payload |
| 63 | if err := stream.SendMsg(&frame); err != nil { |
| 64 | log.Debugf("Error on SendMsg: %s", err.Error()) |
| 65 | break |
| 66 | } |
| 67 | } |
| 68 | if r.sendClosed { |
Scott Baker | 4989fe9 | 2019-10-09 17:03:06 -0700 | [diff] [blame] | 69 | err := stream.CloseSend() |
| 70 | if err != nil { |
| 71 | log.Errorf("%v", err) |
| 72 | } |
Scott Baker | e7144bc | 2019-10-01 14:16:47 -0700 | [diff] [blame] | 73 | } |
| 74 | |
| 75 | r.mutex.Unlock() |
| 76 | |
| 77 | r.forwardResponseStream(connName, stream) |
| 78 | } |
| 79 | |
| 80 | // forwardResponseStream forwards the response stream |
| 81 | func (r *request) forwardResponseStream(connName string, stream grpc.ClientStream) { |
| 82 | var queuedFrames [][]byte |
| 83 | frame := *r.responseFrame |
| 84 | var err error |
| 85 | activeStream := false |
| 86 | for { |
| 87 | err = stream.RecvMsg(&frame) |
| 88 | // if this is an inactive responder, ignore everything it sends |
| 89 | if err != nil && err.Error() == transactionNotAcquiredErrorString { |
| 90 | break |
| 91 | } |
| 92 | // the first thread to reach this point (first to receive a response frame) will become the active stream |
| 93 | r.activeResponseStreamOnce.Do(func() { activeStream = true }) |
| 94 | if err != nil { |
| 95 | // this can be io.EOF which is the success case |
| 96 | break |
| 97 | } |
| 98 | |
| 99 | if r.isStreamingResponse { |
| 100 | // streaming response - send immediately |
| 101 | if err = r.sendResponseFrame(stream, frame); err != nil { |
| 102 | break |
| 103 | } |
| 104 | } else { // !r.isStreamingResponse |
| 105 | |
| 106 | if r.isStreamingRequest { // && !r.isStreamingResponse |
| 107 | // queue the frame (only send response when the last stream closes) |
| 108 | queuedFrames = append(queuedFrames, frame.payload) |
| 109 | } else { // !r.isStreamingRequest && !r.isStreamingResponse |
| 110 | |
| 111 | // only the active stream will respond |
| 112 | if activeStream { // && !r.isStreamingRequest && !r.isStreamingResponse |
| 113 | // send the response immediately |
| 114 | if err = r.sendResponseFrame(stream, frame); err != nil { |
| 115 | break |
| 116 | } |
Scott Baker | e7144bc | 2019-10-01 14:16:47 -0700 | [diff] [blame] | 117 | } |
Scott Baker | 4989fe9 | 2019-10-09 17:03:06 -0700 | [diff] [blame] | 118 | // !activeStream && !r.isStreamingRequest && !r.isStreamingResponse |
| 119 | // just read & discard until the stream dies |
Scott Baker | e7144bc | 2019-10-01 14:16:47 -0700 | [diff] [blame] | 120 | } |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | log.Debugf("Closing stream to %s", connName) |
| 125 | |
| 126 | // io.EOF is the success case |
| 127 | if err == io.EOF { |
| 128 | err = nil |
| 129 | } |
| 130 | |
| 131 | // this double-lock sets off alarm bells in my head |
| 132 | r.backend.mutex.Lock() |
| 133 | r.mutex.Lock() |
| 134 | delete(r.streams, connName) |
| 135 | streamsLeft := len(r.streams) |
| 136 | |
| 137 | // handle the case where no cores are the active responder. Should never happen, but just in case... |
| 138 | if streamsLeft == 0 { |
| 139 | r.activeResponseStreamOnce.Do(func() { activeStream = true }) |
| 140 | } |
| 141 | |
| 142 | // if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests) |
| 143 | if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) { |
| 144 | // request is complete, cleanup |
| 145 | delete(r.backend.activeRequests, r) |
| 146 | r.mutex.Unlock() |
| 147 | r.backend.mutex.Unlock() |
| 148 | |
| 149 | // send any queued frames we have (streaming request & !streaming response only, but no harm trying in other cases) |
| 150 | for _, payload := range queuedFrames { |
| 151 | if err != nil { |
| 152 | // if there's been an error, don't try to send anymore |
| 153 | break |
| 154 | } |
| 155 | frame.payload = payload |
| 156 | err = r.sendResponseFrame(stream, frame) |
| 157 | } |
| 158 | |
| 159 | // We may have received Trailers as part of the call. |
| 160 | r.serverStream.SetTrailer(stream.Trailer()) |
| 161 | |
| 162 | // response stream complete |
| 163 | r.responseErrChan <- err |
| 164 | } else { |
| 165 | r.mutex.Unlock() |
| 166 | r.backend.mutex.Unlock() |
| 167 | } |
| 168 | } |
| 169 | |
| 170 | func (r *request) sendResponseFrame(stream grpc.ClientStream, f responseFrame) error { |
| 171 | r.responseStreamMutex.Lock() |
| 172 | defer r.responseStreamMutex.Unlock() |
| 173 | |
| 174 | // the header should only be set once, even if multiple streams can respond. |
| 175 | setHeader := false |
| 176 | r.setResponseHeaderOnce.Do(func() { setHeader = true }) |
| 177 | if setHeader { |
| 178 | // This is a bit of a hack, but client to server headers are only readable after first client msg is |
| 179 | // received but must be written to server stream before the first msg is flushed. |
| 180 | // This is the only place to do it nicely. |
| 181 | md, err := stream.Header() |
| 182 | if err != nil { |
| 183 | return err |
| 184 | } |
| 185 | // Update the metadata for the response. |
| 186 | if f.metaKey != NoMeta { |
| 187 | if f.metaVal == "" { |
| 188 | // We could also always just do this |
| 189 | md.Set(f.metaKey, f.backend.name) |
| 190 | } else { |
| 191 | md.Set(f.metaKey, f.metaVal) |
| 192 | } |
| 193 | } |
| 194 | if err := r.serverStream.SendHeader(md); err != nil { |
| 195 | return err |
| 196 | } |
| 197 | } |
| 198 | |
| 199 | log.Debugf("Response frame %s", hex.EncodeToString(f.payload)) |
| 200 | |
| 201 | return r.serverStream.SendMsg(&f) |
| 202 | } |
| 203 | |
| 204 | func (r *request) sendAll(frame *requestFrame) error { |
| 205 | r.mutex.Lock() |
| 206 | if !r.isStreamingRequest { |
| 207 | // save frames of non-streaming requests, so we can catchup new streams |
| 208 | r.requestFrameBacklog = append(r.requestFrameBacklog, frame.payload) |
| 209 | } |
| 210 | |
| 211 | // send to all existing streams |
| 212 | streams := make(map[string]grpc.ClientStream, len(r.streams)) |
| 213 | for n, s := range r.streams { |
| 214 | streams[n] = s |
| 215 | } |
| 216 | r.mutex.Unlock() |
| 217 | |
| 218 | var rtrn error |
| 219 | atLeastOne := false |
| 220 | atLeastOneSuccess := false |
| 221 | for _, stream := range streams { |
| 222 | if err := stream.SendMsg(frame); err != nil { |
| 223 | log.Debugf("Error on SendMsg: %s", err.Error()) |
| 224 | rtrn = err |
| 225 | } else { |
| 226 | atLeastOneSuccess = true |
| 227 | } |
| 228 | atLeastOne = true |
| 229 | } |
| 230 | // If one of the streams succeeded, declare success |
| 231 | // if none did pick an error and return it. |
| 232 | if atLeastOne { |
| 233 | if atLeastOneSuccess { |
| 234 | return nil |
| 235 | } else { |
| 236 | return rtrn |
| 237 | } |
| 238 | } else { |
| 239 | err := errors.New("unable to send, all streams have closed") |
| 240 | log.Error(err) |
| 241 | return err |
| 242 | } |
| 243 | } |
| 244 | |
| 245 | func (r *request) forwardRequestStream(src grpc.ServerStream) error { |
| 246 | // The frame buffer already has the results of a first |
| 247 | // RecvMsg in it so the first thing to do is to |
| 248 | // send it to the list of client streams and only |
| 249 | // then read some more. |
| 250 | frame := *r.requestFrame // local copy of frame |
| 251 | var rtrn error |
| 252 | for { |
| 253 | // Send the message to each of the backend streams |
| 254 | if err := r.sendAll(&frame); err != nil { |
| 255 | log.Debugf("SendAll failed %s", err.Error()) |
| 256 | rtrn = err |
| 257 | break |
| 258 | } |
| 259 | log.Debugf("Request frame %s", hex.EncodeToString(frame.payload)) |
| 260 | if err := src.RecvMsg(&frame); err != nil { |
| 261 | rtrn = err // this can be io.EOF which is happy case |
| 262 | break |
| 263 | } |
| 264 | } |
| 265 | |
| 266 | r.mutex.Lock() |
| 267 | log.Debug("Closing southbound streams") |
| 268 | r.sendClosed = true |
| 269 | for _, stream := range r.streams { |
Scott Baker | 4989fe9 | 2019-10-09 17:03:06 -0700 | [diff] [blame] | 270 | err := stream.CloseSend() |
| 271 | if err != nil { |
| 272 | log.Errorf("%v", err) |
| 273 | } |
Scott Baker | e7144bc | 2019-10-01 14:16:47 -0700 | [diff] [blame] | 274 | } |
| 275 | r.mutex.Unlock() |
| 276 | |
| 277 | if rtrn != io.EOF { |
| 278 | log.Debugf("s2cErr reporting %v", rtrn) |
| 279 | return rtrn |
| 280 | } |
| 281 | log.Debug("s2cErr reporting EOF") |
| 282 | // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore. |
| 283 | // the clientStream>serverStream may continue sending though. |
| 284 | return nil |
| 285 | } |