sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2018-present Open Networking Foundation |
| 3 | |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | // gRPC affinity router with active/active backends |
| 17 | |
| 18 | package afrouter |
| 19 | |
| 20 | // Backend manager handles redundant connections per backend |
| 21 | |
| 22 | import ( |
| 23 | "io" |
| 24 | "fmt" |
| 25 | "net" |
| 26 | "sync" |
| 27 | "time" |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 28 | "sort" |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 29 | "errors" |
| 30 | "strconv" |
| 31 | "strings" |
| 32 | "golang.org/x/net/context" |
| 33 | "google.golang.org/grpc" |
| 34 | "google.golang.org/grpc/codes" |
| 35 | "google.golang.org/grpc/metadata" |
| 36 | "google.golang.org/grpc/connectivity" |
| 37 | "github.com/opencord/voltha-go/common/log" |
| 38 | ) |
| 39 | |
| 40 | |
| 41 | |
| 42 | const ( |
| 43 | BE_ACTIVE_ACTIVE = 1 // Backend type active/active |
| 44 | BE_SERVER = 2 // Backend type single server |
| 45 | BE_SEQ_RR = 0 // Backend sequence round robin |
| 46 | AS_NONE = 0 // Association strategy: none |
| 47 | AS_SERIAL_NO = 1 // Association strategy: serial number |
| 48 | AL_NONE = 0 // Association location: none |
| 49 | AL_HEADER = 1 // Association location: header |
| 50 | AL_PROTOBUF = 2 // Association location: protobuf |
| 51 | ) |
| 52 | |
| 53 | |
| 54 | var beTypeNames = []string{"","active_active","server"} |
| 55 | var asTypeNames = []string{"","serial_number"} |
| 56 | var alTypeNames = []string{"","header","protobuf"} |
| 57 | |
| 58 | var bClusters map[string]*backendCluster = make(map[string]*backendCluster) |
| 59 | |
| 60 | type backendCluster struct { |
| 61 | name string |
| 62 | //backends map[string]*backend |
| 63 | backends []*backend |
| 64 | beRvMap map[*backend]int |
| 65 | serialNoSource chan uint64 |
| 66 | } |
| 67 | |
| 68 | type backend struct { |
| 69 | lck sync.Mutex |
| 70 | name string |
| 71 | beType int |
| 72 | activeAssoc assoc |
| 73 | connFailCallback func(string, *backend)bool |
| 74 | connections map[string]*beConnection |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 75 | srtdConns []*beConnection |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 76 | opnConns int |
| 77 | } |
| 78 | |
| 79 | type assoc struct { |
| 80 | strategy int |
| 81 | location int |
| 82 | field string // Used only if location is protobuf |
sslobodr | 8e2ccb5 | 2019-02-05 09:21:47 -0500 | [diff] [blame] | 83 | key string |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 84 | } |
| 85 | |
| 86 | type beConnection struct { |
| 87 | lck sync.Mutex |
| 88 | cncl context.CancelFunc |
| 89 | name string |
| 90 | addr string |
| 91 | port string |
| 92 | gConn *gConnection |
| 93 | bknd *backend |
| 94 | } |
| 95 | |
| 96 | // This structure should never be referred to |
| 97 | // by any routine outside of *beConnection |
| 98 | // routines. |
| 99 | type gConnection struct { |
| 100 | lck sync.Mutex |
| 101 | state connectivity.State |
| 102 | conn *grpc.ClientConn |
| 103 | cncl context.CancelFunc |
| 104 | } |
| 105 | |
| 106 | type beClStrm struct { |
| 107 | strm grpc.ClientStream |
| 108 | ctxt context.Context |
| 109 | cncl context.CancelFunc |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 110 | ok2Close chan struct{} |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 111 | c2sRtrn chan error |
| 112 | s2cRtrn error |
| 113 | } |
| 114 | |
| 115 | type beClStrms struct { |
| 116 | lck sync.Mutex |
| 117 | actvStrm *beClStrm |
| 118 | strms map[string]*beClStrm |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 119 | srtdStrms []*beClStrm |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 120 | } |
| 121 | |
| 122 | //***************************************************************// |
| 123 | //****************** BackendCluster Functions *******************// |
| 124 | //***************************************************************// |
| 125 | |
| 126 | //TODO: Move the backend type (active/active etc) to the cluster |
| 127 | // level. All backends should really be of the same type. |
| 128 | // Create a new backend cluster |
sslobodr | cd37bc5 | 2019-01-24 11:47:16 -0500 | [diff] [blame] | 129 | func newBackendCluster(conf *BackendClusterConfig) (*backendCluster, error) { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 130 | var err error = nil |
| 131 | var rtrn_err bool = false |
| 132 | var be *backend |
sslobodr | 8e2ccb5 | 2019-02-05 09:21:47 -0500 | [diff] [blame] | 133 | log.Debugf("Creating a backend cluster with %v", conf) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 134 | // Validate the configuration |
| 135 | if conf.Name == "" { |
| 136 | log.Error("A backend cluster must have a name") |
| 137 | rtrn_err = true |
| 138 | } |
| 139 | //bc := &backendCluster{name:conf.Name,backends:make(map[string]*backend)} |
| 140 | bc := &backendCluster{name:conf.Name, beRvMap:make(map[*backend]int)} |
| 141 | bClusters[bc.name] = bc |
| 142 | bc.startSerialNumberSource() // Serial numberere for active/active backends |
| 143 | idx := 0 |
sslobodr | 5f0b5a3 | 2019-01-24 07:45:19 -0500 | [diff] [blame] | 144 | for _, bec := range conf.Backends { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 145 | if bec.Name == "" { |
| 146 | log.Errorf("A backend must have a name in cluster %s\n", conf.Name) |
| 147 | rtrn_err = true |
| 148 | } |
| 149 | if be,err = newBackend(&bec, conf.Name); err != nil { |
| 150 | log.Errorf("Error creating backend %s", bec.Name) |
| 151 | rtrn_err = true |
| 152 | } |
| 153 | bc.backends = append(bc.backends, be) |
| 154 | bc.beRvMap[bc.backends[idx]] = idx |
| 155 | idx++ |
| 156 | } |
| 157 | if rtrn_err { |
| 158 | return nil, errors.New("Error creating backend(s)") |
| 159 | } |
| 160 | return bc, nil |
| 161 | } |
| 162 | |
| 163 | func (bc * backendCluster) getBackend(name string) *backend { |
| 164 | for _,v := range bc.backends { |
| 165 | if v.name == name { |
| 166 | return v |
| 167 | } |
| 168 | } |
| 169 | return nil |
| 170 | } |
| 171 | |
| 172 | func (bc *backendCluster) startSerialNumberSource() { |
| 173 | bc.serialNoSource = make(chan uint64) |
| 174 | var counter uint64 = 0 |
| 175 | // This go routine only dies on exit, it is not a leak |
| 176 | go func() { |
| 177 | for { |
| 178 | bc.serialNoSource <- counter |
| 179 | counter++ |
| 180 | } |
| 181 | }() |
| 182 | } |
| 183 | |
| 184 | func (bc *backendCluster) nextBackend(be *backend, seq int) (*backend,error) { |
| 185 | switch seq { |
| 186 | case BE_SEQ_RR: // Round robin |
| 187 | in := be |
| 188 | // If no backend is found having a connection |
| 189 | // then return nil. |
| 190 | if be == nil { |
| 191 | log.Debug("Previous backend is nil") |
| 192 | be = bc.backends[0] |
| 193 | in = be |
| 194 | if be.opnConns != 0 { |
| 195 | return be,nil |
| 196 | } |
| 197 | } |
| 198 | for { |
| 199 | log.Debugf("Requesting a new backend starting from %s", be.name) |
| 200 | cur := bc.beRvMap[be] |
| 201 | cur++ |
| 202 | if cur >= len(bc.backends) { |
| 203 | cur = 0 |
| 204 | } |
| 205 | log.Debugf("Next backend is %d:%s", cur, bc.backends[cur].name) |
| 206 | if bc.backends[cur].opnConns > 0 { |
| 207 | return bc.backends[cur], nil |
| 208 | } |
| 209 | if bc.backends[cur] == in { |
| 210 | err := fmt.Errorf("No backend with open connections found") |
| 211 | log.Debug(err); |
| 212 | return nil,err |
| 213 | } |
| 214 | be = bc.backends[cur] |
| 215 | log.Debugf("Backend '%s' has no open connections, trying next", bc.backends[cur].name) |
| 216 | } |
| 217 | default: // Invalid, defalt to routnd robin |
| 218 | log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq) |
| 219 | return bc.nextBackend(be, BE_SEQ_RR) |
| 220 | } |
| 221 | } |
| 222 | |
| 223 | func (bec *backendCluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, mthdSlice []string, |
| 224 | mk string, mv string) error { |
| 225 | //func (bec *backendCluster) handler(nbR * nbRequest) error { |
| 226 | |
| 227 | // The final backend cluster needs to be determined here. With non-affinity routed backends it could |
| 228 | // just be determined here and for affinity routed backends the first message must be received |
| 229 | // before the backend is determined. In order to keep things simple, the same approach is taken for |
| 230 | // now. |
| 231 | |
| 232 | // Get the backend to use. |
| 233 | // Allocate the nbFrame here since it holds the "context" of this communication |
| 234 | nf := &nbFrame{router:r, mthdSlice:mthdSlice, serNo:bec.serialNoSource, metaKey:mk, metaVal:mv} |
| 235 | log.Debugf("Nb frame allocate with method %s", nf.mthdSlice[REQ_METHOD]) |
| 236 | |
| 237 | if be,err := bec.assignBackend(serverStream, nf); err != nil { |
| 238 | // At this point, no backend streams have been initiated |
| 239 | // so just return the error. |
| 240 | return err |
| 241 | } else { |
| 242 | log.Debugf("Backend '%s' selected", be.name) |
| 243 | // Allocate a sbFrame here because it might be needed for return value intercept |
| 244 | sf := &sbFrame{router:r, be:be, method:nf.mthdSlice[REQ_METHOD], metaKey:mk, metaVal:mv} |
| 245 | log.Debugf("Sb frame allocated with router %s",r.Name()) |
| 246 | return be.handler(srv, serverStream, nf, sf) |
| 247 | } |
| 248 | } |
| 249 | |
| 250 | func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f * nbFrame) (*beClStrms, error) { |
| 251 | |
| 252 | rtrn := &beClStrms{strms:make(map[string]*beClStrm),actvStrm:nil} |
| 253 | |
| 254 | log.Debugf("Opening southbound streams for method '%s'", f.mthdSlice[REQ_METHOD]) |
| 255 | // Get the metadata from the incoming message on the server |
| 256 | md, ok := metadata.FromIncomingContext(serverStream.Context()) |
| 257 | if !ok { |
| 258 | return nil, errors.New("Could not get a server stream metadata") |
| 259 | } |
| 260 | |
| 261 | // TODO: Need to check if this is an active/active backend cluster |
| 262 | // with a serial number in the header. |
| 263 | serialNo := <-f.serNo |
| 264 | log.Debugf("Serial number for transaction allocated: %d", serialNo) |
| 265 | // If even one stream can be created then proceed. If none can be |
| 266 | // created then report an error becase both the primary and redundant |
| 267 | // connections are non-existant. |
| 268 | var atLeastOne bool = false |
| 269 | var errStr strings.Builder |
| 270 | log.Debugf("There are %d connections to open", len(be.connections)) |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 271 | for _,cn := range be.srtdConns { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 272 | // TODO: THIS IS A HACK to suspend redundancy for binding routers for all calls |
| 273 | // and its very specific to a use case. There should really be a per method |
| 274 | // mechanism to select non-redundant calls for all router types. This needs |
| 275 | // to be fixed ASAP. The overrides should be used for this, the implementation |
| 276 | // is simple, and it can be done here. |
| 277 | if atLeastOne == true && f.metaKey != NoMeta { |
| 278 | // Don't open any more southbound streams |
| 279 | log.Debugf("Not opening any more SB streams, metaKey = %s", f.metaKey) |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 280 | rtrn.strms[cn.name] = nil |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 281 | continue |
| 282 | } |
| 283 | // Copy in the metadata |
| 284 | if cn.getState() == connectivity.Ready && cn.getConn() != nil { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 285 | log.Debugf("Opening southbound stream for connection '%s'", cn.name) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 286 | // Create an outgoing context that includes the incoming metadata |
| 287 | // and that will cancel if the server's context is canceled |
| 288 | clientCtx, clientCancel := context.WithCancel(serverStream.Context()) |
| 289 | clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy()) |
| 290 | //TODO: Same check here, only add the serial number if necessary |
| 291 | clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number", |
| 292 | strconv.FormatUint(serialNo,10)) |
| 293 | // Create the client stream |
| 294 | if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, |
| 295 | cn.getConn(), f.mthdSlice[REQ_ALL]); err !=nil { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 296 | log.Debugf("Failed to create a client stream '%s', %v",cn.name,err) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 297 | fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err) |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 298 | rtrn.strms[cn.name] = nil |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 299 | } else { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 300 | rtrn.strms[cn.name] = &beClStrm{strm:clientStream, ctxt:clientCtx, |
| 301 | cncl:clientCancel, s2cRtrn:nil, |
| 302 | ok2Close:make(chan struct{}), |
| 303 | c2sRtrn:make(chan error, 1)} |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 304 | atLeastOne = true |
| 305 | } |
| 306 | } else if cn.getConn() == nil { |
| 307 | err := errors.New(fmt.Sprintf("Connection '%s' is closed", cn.name)) |
| 308 | fmt.Fprint(&errStr, err.Error()) |
| 309 | log.Debug(err) |
| 310 | } else { |
| 311 | err := errors.New(fmt.Sprintf("Connection '%s' isn't ready", cn.name)) |
| 312 | fmt.Fprint(&errStr, err.Error()) |
| 313 | log.Debug(err) |
| 314 | } |
| 315 | } |
| 316 | if atLeastOne == true { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 317 | rtrn.sortStreams() |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 318 | return rtrn,nil |
| 319 | } |
| 320 | fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ",be.name) |
| 321 | log.Error(errStr.String()) |
| 322 | return nil, errors.New(errStr.String()) |
| 323 | } |
| 324 | |
| 325 | func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf * nbFrame, sf * sbFrame) error { |
| 326 | |
| 327 | // Set up and launch each individual southbound stream |
| 328 | var beStrms *beClStrms |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 329 | var rtrn error = nil |
| 330 | var s2cOk bool = false |
| 331 | var c2sOk bool = false |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 332 | |
| 333 | beStrms, err := be.openSouthboundStreams(srv,serverStream,nf) |
| 334 | if err != nil { |
| 335 | log.Errorf("openStreams failed: %v",err) |
| 336 | return err |
| 337 | } |
| 338 | // If we get here, there has to be AT LEAST ONE open stream |
| 339 | |
| 340 | // *Do not explicitly close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate. |
| 341 | // Channels do not have to be closed, it is just a control flow mechanism, see |
| 342 | // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ |
| 343 | |
| 344 | log.Debug("Starting server to client forwarding") |
| 345 | s2cErrChan := beStrms.forwardServerToClient(serverStream, nf) |
| 346 | |
| 347 | log.Debug("Starting client to server forwarding") |
| 348 | c2sErrChan := beStrms.forwardClientToServer(serverStream, sf) |
| 349 | |
| 350 | // We don't know which side is going to stop sending first, so we need a select between the two. |
| 351 | for i := 0; i < 2; i++ { |
| 352 | select { |
| 353 | case s2cErr := <-s2cErrChan: |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 354 | s2cOk = true |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 355 | log.Debug("Processing s2cErr") |
| 356 | if s2cErr == io.EOF { |
| 357 | log.Debug("s2cErr reporting EOF") |
| 358 | // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./ |
| 359 | // the clientStream>serverStream may continue sending though. |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 360 | defer beStrms.closeSend() |
| 361 | if c2sOk == true { |
| 362 | return rtrn |
| 363 | } |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 364 | } else { |
| 365 | log.Debugf("s2cErr reporting %v",s2cErr) |
| 366 | // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need |
| 367 | // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and |
| 368 | // exit with an error to the stack |
| 369 | beStrms.clientCancel() |
| 370 | return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) |
| 371 | } |
| 372 | case c2sErr := <-c2sErrChan: |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 373 | c2sOk = true |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 374 | log.Debug("Processing c2sErr") |
| 375 | // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two |
| 376 | // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers |
| 377 | // will be nil. |
| 378 | serverStream.SetTrailer(beStrms.trailer()) |
| 379 | // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error. |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 380 | // NOTE!!! with redundant backends, it's likely that one of the backends |
| 381 | // returns a response before all the data has been sent southbound and |
| 382 | // the southbound streams are closed. Should this happen one of the |
| 383 | // backends may not get the request. |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 384 | if c2sErr != io.EOF { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 385 | rtrn = c2sErr |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 386 | } |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 387 | log.Debug("c2sErr reporting EOF") |
| 388 | if s2cOk == true { |
| 389 | return rtrn |
| 390 | } |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 391 | } |
| 392 | } |
| 393 | return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") |
| 394 | } |
| 395 | |
| 396 | func (strms *beClStrms) clientCancel() { |
| 397 | for _,strm := range strms.strms { |
| 398 | if strm != nil { |
| 399 | strm.cncl() |
| 400 | } |
| 401 | } |
| 402 | } |
| 403 | |
| 404 | func (strms *beClStrms) closeSend() { |
| 405 | for _,strm := range strms.strms { |
| 406 | if strm != nil { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 407 | <-strm.ok2Close |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 408 | log.Debug("Closing southbound stream") |
| 409 | strm.strm.CloseSend() |
| 410 | } |
| 411 | } |
| 412 | } |
| 413 | |
| 414 | func (strms *beClStrms) trailer() metadata.MD { |
| 415 | return strms.actvStrm.strm.Trailer() |
| 416 | } |
| 417 | |
| 418 | func (bec *backendCluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) { |
| 419 | // Receive the first message from the server. This calls the assigned codec in which |
| 420 | // Unmarshal gets executed. That will use the assigned router to select a backend |
| 421 | // and add it to the frame |
| 422 | if err := src.RecvMsg(f); err != nil { |
| 423 | return nil, err |
| 424 | } |
| 425 | // Check that the backend was routable and actually has connections open. |
| 426 | // If it doesn't then return a nil backend to indicate this |
| 427 | if f.be == nil { |
| 428 | err := fmt.Errorf("Unable to route method '%s'", f.mthdSlice[REQ_METHOD]) |
| 429 | log.Error(err) |
| 430 | return nil, err |
| 431 | } else if f.be.opnConns == 0 { |
| 432 | err := fmt.Errorf("No open connections on backend '%s'", f.be.name) |
| 433 | log.Error(err) |
| 434 | return f.be, err |
| 435 | } |
| 436 | return f.be, nil |
| 437 | } |
| 438 | |
| 439 | func (strms * beClStrms) getActive() *beClStrm { |
| 440 | strms.lck.Lock() |
| 441 | defer strms.lck.Unlock() |
| 442 | return strms.actvStrm |
| 443 | } |
| 444 | |
| 445 | func (strms *beClStrms) setThenGetActive(strm *beClStrm) (*beClStrm) { |
| 446 | strms.lck.Lock() |
| 447 | defer strms.lck.Unlock() |
| 448 | if strms.actvStrm == nil { |
| 449 | strms.actvStrm = strm |
| 450 | } |
| 451 | return strms.actvStrm |
| 452 | } |
| 453 | |
| 454 | func (src *beClStrms) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error { |
| 455 | fc2s := func(srcS *beClStrm) { |
| 456 | for i := 0; ; i++ { |
| 457 | if err := srcS.strm.RecvMsg(f); err != nil { |
| 458 | if src.setThenGetActive(srcS) == srcS { |
| 459 | srcS.c2sRtrn <- err // this can be io.EOF which is the success case |
| 460 | } else { |
| 461 | srcS.c2sRtrn <- nil // Inactive responder |
| 462 | } |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 463 | close(srcS.ok2Close) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 464 | break |
| 465 | } |
| 466 | if src.setThenGetActive(srcS) != srcS { |
| 467 | srcS.c2sRtrn <- nil |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 468 | continue |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 469 | } |
| 470 | if i == 0 { |
| 471 | // This is a bit of a hack, but client to server headers are only readable after first client msg is |
| 472 | // received but must be written to server stream before the first msg is flushed. |
| 473 | // This is the only place to do it nicely. |
| 474 | md, err := srcS.strm.Header() |
| 475 | if err != nil { |
| 476 | srcS.c2sRtrn <- err |
| 477 | break |
| 478 | } |
| 479 | // Update the metadata for the response. |
| 480 | if f.metaKey != NoMeta { |
| 481 | if f.metaVal == "" { |
| 482 | // We could also alsways just do this |
| 483 | md.Set(f.metaKey, f.be.name) |
| 484 | } else { |
| 485 | md.Set(f.metaKey, f.metaVal) |
| 486 | } |
| 487 | } |
| 488 | if err := dst.SendHeader(md); err != nil { |
| 489 | srcS.c2sRtrn <- err |
| 490 | break |
| 491 | } |
| 492 | } |
| 493 | log.Debugf("Northbound frame %v", f.payload) |
| 494 | if err := dst.SendMsg(f); err != nil { |
| 495 | srcS.c2sRtrn <- err |
| 496 | break |
| 497 | } |
| 498 | } |
| 499 | } |
| 500 | |
| 501 | // There should be AT LEAST one open stream at this point |
| 502 | // if there isn't its a grave error in the code and it will |
| 503 | // cause this thread to block here so check for it and |
| 504 | // don't let the lock up happen but report the error |
| 505 | ret := make(chan error, 1) |
| 506 | agg := make(chan *beClStrm) |
| 507 | atLeastOne := false |
| 508 | for _,strm := range src.strms { |
| 509 | if strm != nil { |
| 510 | go fc2s(strm) |
| 511 | go func(s *beClStrm) { // Wait on result and aggregate |
| 512 | r := <-s.c2sRtrn // got the return code |
| 513 | if r == nil { |
| 514 | return // We're the redundat stream, just die |
| 515 | } |
| 516 | s.c2sRtrn <- r // put it back to pass it along |
| 517 | agg <- s // send the stream to the aggregator |
| 518 | } (strm) |
| 519 | atLeastOne = true |
| 520 | } |
| 521 | } |
| 522 | if atLeastOne == true { |
| 523 | go func() { // Wait on aggregated result |
| 524 | s := <-agg |
| 525 | ret <- <-s.c2sRtrn |
| 526 | }() |
| 527 | } else { |
| 528 | err := errors.New("There are no open streams. Unable to forward message.") |
| 529 | log.Error(err) |
| 530 | ret <- err |
| 531 | } |
| 532 | return ret |
| 533 | } |
| 534 | |
| 535 | func (strms *beClStrms) sendAll(f *nbFrame) error { |
| 536 | var rtrn error |
| 537 | |
| 538 | atLeastOne := false |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 539 | for _,strm := range strms.srtdStrms { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 540 | if strm != nil { |
| 541 | if err := strm.strm.SendMsg(f); err != nil { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 542 | log.Debugf("Error on SendMsg: %s", err.Error()) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 543 | strm.s2cRtrn = err |
| 544 | } |
| 545 | atLeastOne = true |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 546 | } else { |
| 547 | log.Debugf("Nil stream") |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 548 | } |
| 549 | } |
| 550 | // If one of the streams succeeded, declare success |
| 551 | // if none did pick an error and return it. |
| 552 | if atLeastOne == true { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 553 | for _,strm := range strms.srtdStrms { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 554 | if strm != nil { |
| 555 | rtrn = strm.s2cRtrn |
| 556 | if rtrn == nil { |
| 557 | return rtrn |
| 558 | } |
| 559 | } |
| 560 | } |
| 561 | return rtrn |
| 562 | } else { |
| 563 | rtrn = errors.New("There are no open streams, this should never happen") |
| 564 | log.Error(rtrn) |
| 565 | } |
| 566 | return rtrn; |
| 567 | } |
| 568 | |
| 569 | func (dst *beClStrms) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error { |
| 570 | ret := make(chan error, 1) |
| 571 | go func() { |
| 572 | // The frame buffer already has the results of a first |
| 573 | // RecvMsg in it so the first thing to do is to |
| 574 | // send it to the list of client streams and only |
| 575 | // then read some more. |
| 576 | for i := 0; ; i++ { |
| 577 | // Send the message to each of the backend streams |
| 578 | if err := dst.sendAll(f); err != nil { |
| 579 | ret <- err |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 580 | log.Debugf("SendAll failed %s", err.Error()) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 581 | break |
| 582 | } |
| 583 | log.Debugf("Southbound frame %v", f.payload) |
| 584 | if err := src.RecvMsg(f); err != nil { |
| 585 | ret <- err // this can be io.EOF which is happy case |
| 586 | break |
| 587 | } |
| 588 | } |
| 589 | }() |
| 590 | return ret |
| 591 | } |
| 592 | |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 593 | func (st * beClStrms) sortStreams() { |
| 594 | var tmpKeys []string |
| 595 | for k,_ := range st.strms { |
| 596 | tmpKeys = append(tmpKeys, k) |
| 597 | } |
| 598 | sort.Strings(tmpKeys) |
| 599 | for _,v := range tmpKeys { |
| 600 | st.srtdStrms = append(st.srtdStrms, st.strms[v]) |
| 601 | } |
| 602 | } |
| 603 | |
| 604 | func (be * backend) sortConns() { |
| 605 | var tmpKeys []string |
| 606 | for k,_ := range be.connections { |
| 607 | tmpKeys = append(tmpKeys, k) |
| 608 | } |
| 609 | sort.Strings(tmpKeys) |
| 610 | for _,v := range tmpKeys { |
| 611 | be.srtdConns = append(be.srtdConns, be.connections[v]) |
| 612 | } |
| 613 | } |
| 614 | |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 615 | func newBackend(conf *BackendConfig, clusterName string) (*backend, error) { |
| 616 | var rtrn_err bool = false |
| 617 | |
| 618 | log.Debugf("Configuring the backend with %v", *conf) |
| 619 | // Validate the conifg and configure the backend |
| 620 | be:=&backend{name:conf.Name,connections:make(map[string]*beConnection),opnConns:0} |
| 621 | idx := strIndex([]string(beTypeNames),conf.Type) |
| 622 | if idx == 0 { |
| 623 | log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName) |
| 624 | rtrn_err = true |
| 625 | } |
| 626 | be.beType = idx |
| 627 | |
| 628 | idx = strIndex(asTypeNames, conf.Association.Strategy) |
| 629 | if idx == 0 && be.beType == BE_ACTIVE_ACTIVE { |
| 630 | log.Errorf("An association strategy must be provided if the backend "+ |
| 631 | "type is active/active for backend %s in cluster %s", conf.Name, clusterName) |
| 632 | rtrn_err = true |
| 633 | } |
| 634 | be.activeAssoc.strategy = idx |
| 635 | |
| 636 | idx = strIndex(alTypeNames, conf.Association.Location) |
| 637 | if idx == 0 && be.beType == BE_ACTIVE_ACTIVE { |
| 638 | log.Errorf("An association location must be provided if the backend "+ |
| 639 | "type is active/active for backend %s in cluster %s", conf.Name, clusterName) |
| 640 | rtrn_err = true |
| 641 | } |
| 642 | be.activeAssoc.location = idx |
| 643 | |
| 644 | if conf.Association.Field == "" && be.activeAssoc.location == AL_PROTOBUF { |
| 645 | log.Errorf("An association field must be provided if the backend "+ |
| 646 | "type is active/active and the location is set to protobuf "+ |
| 647 | "for backend %s in cluster %s", conf.Name, clusterName) |
| 648 | rtrn_err = true |
| 649 | } |
| 650 | be.activeAssoc.field = conf.Association.Field |
sslobodr | 8e2ccb5 | 2019-02-05 09:21:47 -0500 | [diff] [blame] | 651 | |
| 652 | if conf.Association.Key == "" && be.activeAssoc.location == AL_HEADER { |
| 653 | log.Errorf("An association key must be provided if the backend "+ |
| 654 | "type is active/active and the location is set to header "+ |
| 655 | "for backend %s in cluster %s", conf.Name, clusterName) |
| 656 | rtrn_err = true |
| 657 | } |
| 658 | be.activeAssoc.key = conf.Association.Key |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 659 | if rtrn_err { |
| 660 | return nil, errors.New("Backend configuration failed") |
| 661 | } |
| 662 | // Configure the connections |
| 663 | // Connections can consist of just a name. This allows for dynamic configuration |
| 664 | // at a later time. |
| 665 | // TODO: validate that there is one connection for all but active/active backends |
sslobodr | 8e2ccb5 | 2019-02-05 09:21:47 -0500 | [diff] [blame] | 666 | if len(conf.Connections) > 1 && be.activeAssoc.strategy != BE_ACTIVE_ACTIVE { |
| 667 | log.Errorf("Only one connection must be specified if the association "+ |
| 668 | "strategy is not set to 'active_active'") |
| 669 | rtrn_err = true |
| 670 | } |
| 671 | if len(conf.Connections) == 0 { |
| 672 | log.Errorf("At least one connection must be specified") |
| 673 | rtrn_err = true |
| 674 | } |
sslobodr | 5f0b5a3 | 2019-01-24 07:45:19 -0500 | [diff] [blame] | 675 | for _,cnConf := range conf.Connections { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 676 | if cnConf.Name == "" { |
| 677 | log.Errorf("A connection must have a name for backend %s in cluster %s", |
| 678 | conf.Name, clusterName) |
| 679 | } else { |
| 680 | gc:=&gConnection{conn:nil,cncl:nil,state:connectivity.Idle} |
| 681 | be.connections[cnConf.Name] = &beConnection{name:cnConf.Name,addr:cnConf.Addr,port:cnConf.Port,bknd:be,gConn:gc} |
| 682 | if cnConf.Addr != "" { // This connection will be specified later. |
| 683 | if ip := net.ParseIP(cnConf.Addr); ip == nil { |
| 684 | log.Errorf("The IP address for connection %s in backend %s in cluster %s is invalid", |
| 685 | cnConf.Name, conf.Name, clusterName) |
| 686 | rtrn_err = true |
| 687 | } |
| 688 | // Validate the port number. This just validtes that it's a non 0 integer |
| 689 | if n,err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 { |
| 690 | log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid", |
| 691 | cnConf.Port, cnConf.Name, conf.Name, clusterName) |
| 692 | rtrn_err = true |
| 693 | } else { |
| 694 | if n <=0 && n > 65535 { |
| 695 | log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid", |
| 696 | cnConf.Port, cnConf.Name, conf.Name, clusterName) |
| 697 | rtrn_err = true |
| 698 | } |
| 699 | } |
| 700 | } |
| 701 | } |
| 702 | } |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 703 | |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 704 | if rtrn_err { |
| 705 | return nil, errors.New("Connection configuration failed") |
| 706 | } |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 707 | // Create the sorted connection list for deterministic |
| 708 | // active-active call orders. |
| 709 | be.sortConns() |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 710 | // All is well start the backend cluster connections |
| 711 | be.connectAll() |
| 712 | |
| 713 | return be, nil |
| 714 | } |
| 715 | |
| 716 | //***************************************************************// |
| 717 | //********************* Backend Functions ***********************// |
| 718 | //***************************************************************// |
| 719 | |
| 720 | func (be *backend) incConn() { |
| 721 | be.lck.Lock() |
| 722 | defer be.lck.Unlock() |
| 723 | be.opnConns++ |
| 724 | } |
| 725 | |
| 726 | func (be *backend) decConn() { |
| 727 | be.lck.Lock() |
| 728 | defer be.lck.Unlock() |
| 729 | be.opnConns-- |
| 730 | if be.opnConns < 0 { |
| 731 | log.Error("Internal error, number of open connections less than 0") |
| 732 | be.opnConns = 0 |
| 733 | } |
| 734 | } |
| 735 | |
| 736 | // Attempts to establish all the connections for a backend |
| 737 | // any failures result in an abort. This should only be called |
| 738 | // on a first attempt to connect. Individual connections should be |
| 739 | // handled after that. |
| 740 | func (be *backend) connectAll() { |
| 741 | for _,cn := range be.connections { |
| 742 | cn.connect() |
| 743 | } |
| 744 | } |
| 745 | |
| 746 | func (cn *beConnection) connect() { |
| 747 | if cn.addr != "" && cn.getConn() == nil { |
| 748 | log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name,cn.addr,cn.port) |
| 749 | // Dial doesn't block, it just returns and continues connecting in the background. |
| 750 | // Check back later to confirm and increase the connection count. |
| 751 | ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection |
| 752 | cn.setCncl(cnclFnc) |
| 753 | if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil { |
| 754 | log.Errorf("Dialng connection %v:%v",cn,err) |
| 755 | cn.waitAndTryAgain(ctx) |
| 756 | } else { |
| 757 | cn.setConn(conn) |
| 758 | log.Debugf("Starting the connection monitor for '%s'", cn.name) |
| 759 | cn.monitor(ctx) |
| 760 | } |
| 761 | } else if cn.addr == "" { |
| 762 | log.Infof("No address supplied for connection '%s', not connecting for now", cn.name) |
| 763 | } else { |
| 764 | log.Debugf("Connection '%s' is already connected, ignoring", cn.name) |
| 765 | } |
| 766 | } |
| 767 | |
| 768 | func (cn *beConnection) waitAndTryAgain(ctx context.Context) { |
| 769 | go func(ctx context.Context) { |
| 770 | ctxTm,cnclTm := context.WithTimeout(context.Background(), 10 * time.Second) |
| 771 | select { |
| 772 | case <-ctxTm.Done(): |
| 773 | cnclTm() |
| 774 | log.Debugf("Trying to connect '%s'",cn.name) |
| 775 | // Connect creates a new context so cancel this one. |
| 776 | cn.cancel() |
| 777 | cn.connect() |
| 778 | return |
| 779 | case <-ctx.Done(): |
| 780 | cnclTm() |
| 781 | return |
| 782 | } |
| 783 | }(ctx) |
| 784 | } |
| 785 | |
| 786 | func (cn *beConnection) cancel() { |
| 787 | cn.lck.Lock() |
| 788 | defer cn.lck.Unlock() |
| 789 | log.Debugf("Canceling connection %s", cn.name) |
| 790 | if cn.gConn != nil{ |
| 791 | if cn.gConn.cncl != nil { |
| 792 | cn.cncl() |
| 793 | } else { |
| 794 | log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name) |
| 795 | } |
| 796 | } else { |
| 797 | log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name) |
| 798 | } |
| 799 | } |
| 800 | |
| 801 | func (cn *beConnection) setCncl(cncl context.CancelFunc) { |
| 802 | cn.lck.Lock() |
| 803 | defer cn.lck.Unlock() |
| 804 | if cn.gConn != nil { |
| 805 | cn.gConn.cncl = cncl |
| 806 | } else { |
| 807 | log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name) |
| 808 | } |
| 809 | } |
| 810 | |
| 811 | func (cn *beConnection) setConn(conn *grpc.ClientConn) { |
| 812 | cn.lck.Lock() |
| 813 | defer cn.lck.Unlock() |
| 814 | if cn.gConn != nil { |
| 815 | cn.gConn.conn = conn |
| 816 | } else { |
| 817 | log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name) |
| 818 | } |
| 819 | } |
| 820 | |
| 821 | func (cn *beConnection) getConn() *grpc.ClientConn { |
| 822 | cn.lck.Lock() |
| 823 | defer cn.lck.Unlock() |
| 824 | if cn.gConn != nil { |
| 825 | return cn.gConn.conn |
| 826 | } |
| 827 | return nil |
| 828 | } |
| 829 | |
| 830 | func (cn *beConnection) close() { |
| 831 | cn.lck.Lock() |
| 832 | defer cn.lck.Unlock() |
| 833 | log.Debugf("Closing connection %s", cn.name) |
| 834 | if cn.gConn != nil && cn.gConn.conn != nil { |
| 835 | if cn.gConn.conn.GetState() == connectivity.Ready { |
| 836 | cn.bknd.decConn() // Decrease the connection reference |
| 837 | } |
| 838 | if cn.gConn.cncl != nil { |
| 839 | cn.gConn.cncl() // Cancel the context first to force monitor functions to exit |
| 840 | } else { |
| 841 | log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name) |
| 842 | } |
| 843 | cn.gConn.conn.Close() // Close the connection |
| 844 | // Now replace the gConn object with a new one as this one just |
| 845 | // fades away as references to it are released after the close |
| 846 | // finishes in the background. |
| 847 | cn.gConn = &gConnection{conn:nil,cncl:nil,state:connectivity.TransientFailure} |
| 848 | } else { |
| 849 | log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name) |
| 850 | } |
| 851 | |
| 852 | } |
| 853 | |
| 854 | func (cn *beConnection) setState(st connectivity.State) { |
| 855 | cn.lck.Lock() |
| 856 | defer cn.lck.Unlock() |
| 857 | if cn.gConn != nil { |
| 858 | cn.gConn.state = st |
| 859 | } else { |
| 860 | log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name) |
| 861 | } |
| 862 | } |
| 863 | |
| 864 | func (cn *beConnection) getState() (connectivity.State) { |
| 865 | cn.lck.Lock() |
| 866 | defer cn.lck.Unlock() |
| 867 | if cn.gConn != nil { |
| 868 | if cn.gConn.conn != nil { |
| 869 | return cn.gConn.conn.GetState() |
| 870 | } else { |
| 871 | log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name) |
| 872 | } |
| 873 | } else { |
| 874 | log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name) |
| 875 | } |
| 876 | // For lack of a better state to use. The logs will help determine what happened here. |
| 877 | return connectivity.TransientFailure |
| 878 | } |
| 879 | |
| 880 | |
| 881 | func (cn *beConnection) monitor(ctx context.Context) { |
| 882 | bp := cn.bknd |
| 883 | log.Debugf("Setting up monitoring for backend %s", bp.name) |
| 884 | go func(ctx context.Context) { |
| 885 | var delay time.Duration = 100 //ms |
| 886 | for { |
| 887 | //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, bp.name, cn.conn) |
| 888 | if cn.getState() == connectivity.Ready { |
| 889 | log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, bp.name) |
| 890 | cn.setState(connectivity.Ready) |
| 891 | bp.incConn() |
| 892 | if cn.getConn() != nil && cn.getConn().WaitForStateChange(ctx, connectivity.Ready) == false { |
| 893 | // The context was canceled. This is done by the close function |
| 894 | // so just exit the routine |
| 895 | log.Debugf("Contxt canceled for connection '%s' on backend '%s'",cn.name, bp.name) |
| 896 | return |
| 897 | } |
| 898 | if cs := cn.getConn(); cs != nil { |
| 899 | switch cs := cn.getState(); cs { |
| 900 | case connectivity.TransientFailure: |
| 901 | cn.setState(cs) |
| 902 | bp.decConn() |
| 903 | log.Infof("Transient failure for connection '%s' on backend '%s'",cn.name, bp.name) |
| 904 | delay = 100 |
| 905 | case connectivity.Shutdown: |
| 906 | //The connection was closed. The assumption here is that the closer |
| 907 | // will manage the connection count and setting the conn to nil. |
| 908 | // Exit the routine |
| 909 | log.Infof("Shutdown for connection '%s' on backend '%s'",cn.name, bp.name) |
| 910 | return |
| 911 | case connectivity.Idle: |
| 912 | // This can only happen if the server sends a GoAway. This can |
| 913 | // only happen if the server has modified MaxConnectionIdle from |
| 914 | // its default of infinity. The only solution here is to close the |
| 915 | // connection and keepTrying()? |
| 916 | //TODO: Read the grpc source code to see if there's a different approach |
| 917 | log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'",cn.name, bp.name) |
| 918 | cn.close() |
| 919 | cn.connect() |
| 920 | return |
| 921 | } |
| 922 | } else { // A nil means something went horribly wrong, error and exit. |
| 923 | log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s",cn.name) |
| 924 | return |
| 925 | } |
| 926 | } else { |
| 927 | log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, bp.name) |
| 928 | ctxTm, cnclTm := context.WithTimeout(context.Background(), delay * time.Millisecond) |
| 929 | if delay < 30000 { |
| 930 | delay += delay |
| 931 | } |
| 932 | select { |
| 933 | case <-ctxTm.Done(): |
| 934 | cnclTm() // Doubt this is required but it's harmless. |
| 935 | // Do nothing but let the loop continue |
| 936 | case <-ctx.Done(): |
| 937 | // Context was closed, close and exit routine |
| 938 | //cn.close() NO! let the close be managed externally! |
| 939 | return |
| 940 | } |
| 941 | } |
| 942 | } |
| 943 | }(ctx) |
| 944 | } |
| 945 | |
| 946 | // Set a callback for connection failure notification |
| 947 | // This is currently not used. |
| 948 | func (bp * backend) setConnFailCallback(cb func(string, *backend)bool) { |
| 949 | bp.connFailCallback = cb |
| 950 | } |
| 951 | |