Scott Baker | e7144bc | 2019-10-01 14:16:47 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2019-present Open Networking Foundation |
| 3 | |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package afrouter |
| 18 | |
| 19 | import ( |
| 20 | "context" |
| 21 | "encoding/hex" |
| 22 | "errors" |
| 23 | "github.com/opencord/voltha-go/common/log" |
| 24 | "google.golang.org/grpc" |
| 25 | "google.golang.org/grpc/codes" |
| 26 | "google.golang.org/grpc/status" |
| 27 | "io" |
| 28 | "sync" |
| 29 | ) |
| 30 | |
| 31 | type request struct { |
| 32 | mutex sync.Mutex |
| 33 | activeResponseStreamOnce sync.Once |
| 34 | setResponseHeaderOnce sync.Once |
| 35 | responseStreamMutex sync.Mutex |
| 36 | |
| 37 | streams map[string]grpc.ClientStream |
| 38 | |
| 39 | requestFrameBacklog [][]byte |
| 40 | responseErrChan chan error |
| 41 | sendClosed bool |
| 42 | |
| 43 | backend *backend |
| 44 | ctx context.Context |
| 45 | serverStream grpc.ServerStream |
| 46 | methodInfo methodDetails |
| 47 | requestFrame *requestFrame |
| 48 | responseFrame *responseFrame |
| 49 | isStreamingRequest bool |
| 50 | isStreamingResponse bool |
| 51 | } |
| 52 | |
| 53 | var transactionNotAcquiredErrorString = status.Error(codes.Canceled, "transaction-not-acquired").Error() |
| 54 | |
| 55 | // catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked |
| 56 | func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) { |
| 57 | r.streams[connName] = stream |
| 58 | |
| 59 | // prime new streams with any traffic they might have missed (non-streaming requests only) |
| 60 | frame := *r.requestFrame // local copy of frame |
| 61 | for _, payload := range r.requestFrameBacklog { |
| 62 | frame.payload = payload |
| 63 | if err := stream.SendMsg(&frame); err != nil { |
| 64 | log.Debugf("Error on SendMsg: %s", err.Error()) |
| 65 | break |
| 66 | } |
| 67 | } |
| 68 | if r.sendClosed { |
| 69 | stream.CloseSend() |
| 70 | } |
| 71 | |
| 72 | r.mutex.Unlock() |
| 73 | |
| 74 | r.forwardResponseStream(connName, stream) |
| 75 | } |
| 76 | |
| 77 | // forwardResponseStream forwards the response stream |
| 78 | func (r *request) forwardResponseStream(connName string, stream grpc.ClientStream) { |
| 79 | var queuedFrames [][]byte |
| 80 | frame := *r.responseFrame |
| 81 | var err error |
| 82 | activeStream := false |
| 83 | for { |
| 84 | err = stream.RecvMsg(&frame) |
| 85 | // if this is an inactive responder, ignore everything it sends |
| 86 | if err != nil && err.Error() == transactionNotAcquiredErrorString { |
| 87 | break |
| 88 | } |
| 89 | // the first thread to reach this point (first to receive a response frame) will become the active stream |
| 90 | r.activeResponseStreamOnce.Do(func() { activeStream = true }) |
| 91 | if err != nil { |
| 92 | // this can be io.EOF which is the success case |
| 93 | break |
| 94 | } |
| 95 | |
| 96 | if r.isStreamingResponse { |
| 97 | // streaming response - send immediately |
| 98 | if err = r.sendResponseFrame(stream, frame); err != nil { |
| 99 | break |
| 100 | } |
| 101 | } else { // !r.isStreamingResponse |
| 102 | |
| 103 | if r.isStreamingRequest { // && !r.isStreamingResponse |
| 104 | // queue the frame (only send response when the last stream closes) |
| 105 | queuedFrames = append(queuedFrames, frame.payload) |
| 106 | } else { // !r.isStreamingRequest && !r.isStreamingResponse |
| 107 | |
| 108 | // only the active stream will respond |
| 109 | if activeStream { // && !r.isStreamingRequest && !r.isStreamingResponse |
| 110 | // send the response immediately |
| 111 | if err = r.sendResponseFrame(stream, frame); err != nil { |
| 112 | break |
| 113 | } |
| 114 | } else { // !activeStream && !r.isStreamingRequest && !r.isStreamingResponse |
| 115 | // just read & discard until the stream dies |
| 116 | } |
| 117 | } |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | log.Debugf("Closing stream to %s", connName) |
| 122 | |
| 123 | // io.EOF is the success case |
| 124 | if err == io.EOF { |
| 125 | err = nil |
| 126 | } |
| 127 | |
| 128 | // this double-lock sets off alarm bells in my head |
| 129 | r.backend.mutex.Lock() |
| 130 | r.mutex.Lock() |
| 131 | delete(r.streams, connName) |
| 132 | streamsLeft := len(r.streams) |
| 133 | |
| 134 | // handle the case where no cores are the active responder. Should never happen, but just in case... |
| 135 | if streamsLeft == 0 { |
| 136 | r.activeResponseStreamOnce.Do(func() { activeStream = true }) |
| 137 | } |
| 138 | |
| 139 | // if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests) |
| 140 | if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) { |
| 141 | // request is complete, cleanup |
| 142 | delete(r.backend.activeRequests, r) |
| 143 | r.mutex.Unlock() |
| 144 | r.backend.mutex.Unlock() |
| 145 | |
| 146 | // send any queued frames we have (streaming request & !streaming response only, but no harm trying in other cases) |
| 147 | for _, payload := range queuedFrames { |
| 148 | if err != nil { |
| 149 | // if there's been an error, don't try to send anymore |
| 150 | break |
| 151 | } |
| 152 | frame.payload = payload |
| 153 | err = r.sendResponseFrame(stream, frame) |
| 154 | } |
| 155 | |
| 156 | // We may have received Trailers as part of the call. |
| 157 | r.serverStream.SetTrailer(stream.Trailer()) |
| 158 | |
| 159 | // response stream complete |
| 160 | r.responseErrChan <- err |
| 161 | } else { |
| 162 | r.mutex.Unlock() |
| 163 | r.backend.mutex.Unlock() |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | func (r *request) sendResponseFrame(stream grpc.ClientStream, f responseFrame) error { |
| 168 | r.responseStreamMutex.Lock() |
| 169 | defer r.responseStreamMutex.Unlock() |
| 170 | |
| 171 | // the header should only be set once, even if multiple streams can respond. |
| 172 | setHeader := false |
| 173 | r.setResponseHeaderOnce.Do(func() { setHeader = true }) |
| 174 | if setHeader { |
| 175 | // This is a bit of a hack, but client to server headers are only readable after first client msg is |
| 176 | // received but must be written to server stream before the first msg is flushed. |
| 177 | // This is the only place to do it nicely. |
| 178 | md, err := stream.Header() |
| 179 | if err != nil { |
| 180 | return err |
| 181 | } |
| 182 | // Update the metadata for the response. |
| 183 | if f.metaKey != NoMeta { |
| 184 | if f.metaVal == "" { |
| 185 | // We could also always just do this |
| 186 | md.Set(f.metaKey, f.backend.name) |
| 187 | } else { |
| 188 | md.Set(f.metaKey, f.metaVal) |
| 189 | } |
| 190 | } |
| 191 | if err := r.serverStream.SendHeader(md); err != nil { |
| 192 | return err |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | log.Debugf("Response frame %s", hex.EncodeToString(f.payload)) |
| 197 | |
| 198 | return r.serverStream.SendMsg(&f) |
| 199 | } |
| 200 | |
| 201 | func (r *request) sendAll(frame *requestFrame) error { |
| 202 | r.mutex.Lock() |
| 203 | if !r.isStreamingRequest { |
| 204 | // save frames of non-streaming requests, so we can catchup new streams |
| 205 | r.requestFrameBacklog = append(r.requestFrameBacklog, frame.payload) |
| 206 | } |
| 207 | |
| 208 | // send to all existing streams |
| 209 | streams := make(map[string]grpc.ClientStream, len(r.streams)) |
| 210 | for n, s := range r.streams { |
| 211 | streams[n] = s |
| 212 | } |
| 213 | r.mutex.Unlock() |
| 214 | |
| 215 | var rtrn error |
| 216 | atLeastOne := false |
| 217 | atLeastOneSuccess := false |
| 218 | for _, stream := range streams { |
| 219 | if err := stream.SendMsg(frame); err != nil { |
| 220 | log.Debugf("Error on SendMsg: %s", err.Error()) |
| 221 | rtrn = err |
| 222 | } else { |
| 223 | atLeastOneSuccess = true |
| 224 | } |
| 225 | atLeastOne = true |
| 226 | } |
| 227 | // If one of the streams succeeded, declare success |
| 228 | // if none did pick an error and return it. |
| 229 | if atLeastOne { |
| 230 | if atLeastOneSuccess { |
| 231 | return nil |
| 232 | } else { |
| 233 | return rtrn |
| 234 | } |
| 235 | } else { |
| 236 | err := errors.New("unable to send, all streams have closed") |
| 237 | log.Error(err) |
| 238 | return err |
| 239 | } |
| 240 | } |
| 241 | |
| 242 | func (r *request) forwardRequestStream(src grpc.ServerStream) error { |
| 243 | // The frame buffer already has the results of a first |
| 244 | // RecvMsg in it so the first thing to do is to |
| 245 | // send it to the list of client streams and only |
| 246 | // then read some more. |
| 247 | frame := *r.requestFrame // local copy of frame |
| 248 | var rtrn error |
| 249 | for { |
| 250 | // Send the message to each of the backend streams |
| 251 | if err := r.sendAll(&frame); err != nil { |
| 252 | log.Debugf("SendAll failed %s", err.Error()) |
| 253 | rtrn = err |
| 254 | break |
| 255 | } |
| 256 | log.Debugf("Request frame %s", hex.EncodeToString(frame.payload)) |
| 257 | if err := src.RecvMsg(&frame); err != nil { |
| 258 | rtrn = err // this can be io.EOF which is happy case |
| 259 | break |
| 260 | } |
| 261 | } |
| 262 | |
| 263 | r.mutex.Lock() |
| 264 | log.Debug("Closing southbound streams") |
| 265 | r.sendClosed = true |
| 266 | for _, stream := range r.streams { |
| 267 | stream.CloseSend() |
| 268 | } |
| 269 | r.mutex.Unlock() |
| 270 | |
| 271 | if rtrn != io.EOF { |
| 272 | log.Debugf("s2cErr reporting %v", rtrn) |
| 273 | return rtrn |
| 274 | } |
| 275 | log.Debug("s2cErr reporting EOF") |
| 276 | // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore. |
| 277 | // the clientStream>serverStream may continue sending though. |
| 278 | return nil |
| 279 | } |