blob: 9262d43806da34b16b81e05d6561db3e16f4062f [file] [log] [blame]
sslobodr392ebd52019-01-18 12:41:49 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16// gRPC affinity router with active/active backends
17
18package afrouter
19
20// Backend manager handles redundant connections per backend
21
22import (
23 "io"
24 "fmt"
25 "net"
26 "sync"
27 "time"
28 "errors"
29 "strconv"
30 "strings"
31 "golang.org/x/net/context"
32 "google.golang.org/grpc"
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/metadata"
35 "google.golang.org/grpc/connectivity"
36 "github.com/opencord/voltha-go/common/log"
37)
38
39
40
41const (
42 BE_ACTIVE_ACTIVE = 1 // Backend type active/active
43 BE_SERVER = 2 // Backend type single server
44 BE_SEQ_RR = 0 // Backend sequence round robin
45 AS_NONE = 0 // Association strategy: none
46 AS_SERIAL_NO = 1 // Association strategy: serial number
47 AL_NONE = 0 // Association location: none
48 AL_HEADER = 1 // Association location: header
49 AL_PROTOBUF = 2 // Association location: protobuf
50)
51
52
53var beTypeNames = []string{"","active_active","server"}
54var asTypeNames = []string{"","serial_number"}
55var alTypeNames = []string{"","header","protobuf"}
56
57var bClusters map[string]*backendCluster = make(map[string]*backendCluster)
58
59type backendCluster struct {
60 name string
61 //backends map[string]*backend
62 backends []*backend
63 beRvMap map[*backend]int
64 serialNoSource chan uint64
65}
66
67type backend struct {
68 lck sync.Mutex
69 name string
70 beType int
71 activeAssoc assoc
72 connFailCallback func(string, *backend)bool
73 connections map[string]*beConnection
74 opnConns int
75}
76
77type assoc struct {
78 strategy int
79 location int
80 field string // Used only if location is protobuf
81}
82
83type beConnection struct {
84 lck sync.Mutex
85 cncl context.CancelFunc
86 name string
87 addr string
88 port string
89 gConn *gConnection
90 bknd *backend
91}
92
93// This structure should never be referred to
94// by any routine outside of *beConnection
95// routines.
96type gConnection struct {
97 lck sync.Mutex
98 state connectivity.State
99 conn *grpc.ClientConn
100 cncl context.CancelFunc
101}
102
103type beClStrm struct {
104 strm grpc.ClientStream
105 ctxt context.Context
106 cncl context.CancelFunc
107 c2sRtrn chan error
108 s2cRtrn error
109}
110
111type beClStrms struct {
112 lck sync.Mutex
113 actvStrm *beClStrm
114 strms map[string]*beClStrm
115}
116
117//***************************************************************//
118//****************** BackendCluster Functions *******************//
119//***************************************************************//
120
121//TODO: Move the backend type (active/active etc) to the cluster
122// level. All backends should really be of the same type.
123// Create a new backend cluster
sslobodrcd37bc52019-01-24 11:47:16 -0500124func newBackendCluster(conf *BackendClusterConfig) (*backendCluster, error) {
sslobodr392ebd52019-01-18 12:41:49 -0500125 var err error = nil
126 var rtrn_err bool = false
127 var be *backend
128 log.Debug("Creating a backend cluster with %v", conf)
129 // Validate the configuration
130 if conf.Name == "" {
131 log.Error("A backend cluster must have a name")
132 rtrn_err = true
133 }
134 //bc := &backendCluster{name:conf.Name,backends:make(map[string]*backend)}
135 bc := &backendCluster{name:conf.Name, beRvMap:make(map[*backend]int)}
136 bClusters[bc.name] = bc
137 bc.startSerialNumberSource() // Serial numberere for active/active backends
138 idx := 0
sslobodr5f0b5a32019-01-24 07:45:19 -0500139 for _, bec := range conf.Backends {
sslobodr392ebd52019-01-18 12:41:49 -0500140 if bec.Name == "" {
141 log.Errorf("A backend must have a name in cluster %s\n", conf.Name)
142 rtrn_err = true
143 }
144 if be,err = newBackend(&bec, conf.Name); err != nil {
145 log.Errorf("Error creating backend %s", bec.Name)
146 rtrn_err = true
147 }
148 bc.backends = append(bc.backends, be)
149 bc.beRvMap[bc.backends[idx]] = idx
150 idx++
151 }
152 if rtrn_err {
153 return nil, errors.New("Error creating backend(s)")
154 }
155 return bc, nil
156}
157
158func (bc * backendCluster) getBackend(name string) *backend {
159 for _,v := range bc.backends {
160 if v.name == name {
161 return v
162 }
163 }
164 return nil
165}
166
167func (bc *backendCluster) startSerialNumberSource() {
168 bc.serialNoSource = make(chan uint64)
169 var counter uint64 = 0
170 // This go routine only dies on exit, it is not a leak
171 go func() {
172 for {
173 bc.serialNoSource <- counter
174 counter++
175 }
176 }()
177}
178
179func (bc *backendCluster) nextBackend(be *backend, seq int) (*backend,error) {
180 switch seq {
181 case BE_SEQ_RR: // Round robin
182 in := be
183 // If no backend is found having a connection
184 // then return nil.
185 if be == nil {
186 log.Debug("Previous backend is nil")
187 be = bc.backends[0]
188 in = be
189 if be.opnConns != 0 {
190 return be,nil
191 }
192 }
193 for {
194 log.Debugf("Requesting a new backend starting from %s", be.name)
195 cur := bc.beRvMap[be]
196 cur++
197 if cur >= len(bc.backends) {
198 cur = 0
199 }
200 log.Debugf("Next backend is %d:%s", cur, bc.backends[cur].name)
201 if bc.backends[cur].opnConns > 0 {
202 return bc.backends[cur], nil
203 }
204 if bc.backends[cur] == in {
205 err := fmt.Errorf("No backend with open connections found")
206 log.Debug(err);
207 return nil,err
208 }
209 be = bc.backends[cur]
210 log.Debugf("Backend '%s' has no open connections, trying next", bc.backends[cur].name)
211 }
212 default: // Invalid, defalt to routnd robin
213 log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq)
214 return bc.nextBackend(be, BE_SEQ_RR)
215 }
216}
217
218func (bec *backendCluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, mthdSlice []string,
219 mk string, mv string) error {
220//func (bec *backendCluster) handler(nbR * nbRequest) error {
221
222 // The final backend cluster needs to be determined here. With non-affinity routed backends it could
223 // just be determined here and for affinity routed backends the first message must be received
224 // before the backend is determined. In order to keep things simple, the same approach is taken for
225 // now.
226
227 // Get the backend to use.
228 // Allocate the nbFrame here since it holds the "context" of this communication
229 nf := &nbFrame{router:r, mthdSlice:mthdSlice, serNo:bec.serialNoSource, metaKey:mk, metaVal:mv}
230 log.Debugf("Nb frame allocate with method %s", nf.mthdSlice[REQ_METHOD])
231
232 if be,err := bec.assignBackend(serverStream, nf); err != nil {
233 // At this point, no backend streams have been initiated
234 // so just return the error.
235 return err
236 } else {
237 log.Debugf("Backend '%s' selected", be.name)
238 // Allocate a sbFrame here because it might be needed for return value intercept
239 sf := &sbFrame{router:r, be:be, method:nf.mthdSlice[REQ_METHOD], metaKey:mk, metaVal:mv}
240 log.Debugf("Sb frame allocated with router %s",r.Name())
241 return be.handler(srv, serverStream, nf, sf)
242 }
243}
244
245func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f * nbFrame) (*beClStrms, error) {
246
247 rtrn := &beClStrms{strms:make(map[string]*beClStrm),actvStrm:nil}
248
249 log.Debugf("Opening southbound streams for method '%s'", f.mthdSlice[REQ_METHOD])
250 // Get the metadata from the incoming message on the server
251 md, ok := metadata.FromIncomingContext(serverStream.Context())
252 if !ok {
253 return nil, errors.New("Could not get a server stream metadata")
254 }
255
256 // TODO: Need to check if this is an active/active backend cluster
257 // with a serial number in the header.
258 serialNo := <-f.serNo
259 log.Debugf("Serial number for transaction allocated: %d", serialNo)
260 // If even one stream can be created then proceed. If none can be
261 // created then report an error becase both the primary and redundant
262 // connections are non-existant.
263 var atLeastOne bool = false
264 var errStr strings.Builder
265 log.Debugf("There are %d connections to open", len(be.connections))
266 for cnk,cn := range be.connections {
267 // TODO: THIS IS A HACK to suspend redundancy for binding routers for all calls
268 // and its very specific to a use case. There should really be a per method
269 // mechanism to select non-redundant calls for all router types. This needs
270 // to be fixed ASAP. The overrides should be used for this, the implementation
271 // is simple, and it can be done here.
272 if atLeastOne == true && f.metaKey != NoMeta {
273 // Don't open any more southbound streams
274 log.Debugf("Not opening any more SB streams, metaKey = %s", f.metaKey)
275 rtrn.strms[cnk] = nil
276 continue
277 }
278 // Copy in the metadata
279 if cn.getState() == connectivity.Ready && cn.getConn() != nil {
280 log.Debugf("Opening southbound stream for connection '%s'", cnk)
281 // Create an outgoing context that includes the incoming metadata
282 // and that will cancel if the server's context is canceled
283 clientCtx, clientCancel := context.WithCancel(serverStream.Context())
284 clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy())
285 //TODO: Same check here, only add the serial number if necessary
286 clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number",
287 strconv.FormatUint(serialNo,10))
288 // Create the client stream
289 if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
290 cn.getConn(), f.mthdSlice[REQ_ALL]); err !=nil {
291 log.Debug("Failed to create a client stream '%s', %v",cn.name,err)
292 fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
293 rtrn.strms[cnk] = nil
294 } else {
295 rtrn.strms[cnk] = &beClStrm{strm:clientStream, ctxt:clientCtx, cncl:clientCancel, s2cRtrn:nil,
296 c2sRtrn:make(chan error, 1)}
297 atLeastOne = true
298 }
299 } else if cn.getConn() == nil {
300 err := errors.New(fmt.Sprintf("Connection '%s' is closed", cn.name))
301 fmt.Fprint(&errStr, err.Error())
302 log.Debug(err)
303 } else {
304 err := errors.New(fmt.Sprintf("Connection '%s' isn't ready", cn.name))
305 fmt.Fprint(&errStr, err.Error())
306 log.Debug(err)
307 }
308 }
309 if atLeastOne == true {
310 return rtrn,nil
311 }
312 fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ",be.name)
313 log.Error(errStr.String())
314 return nil, errors.New(errStr.String())
315}
316
317func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf * nbFrame, sf * sbFrame) error {
318
319 // Set up and launch each individual southbound stream
320 var beStrms *beClStrms
321
322 beStrms, err := be.openSouthboundStreams(srv,serverStream,nf)
323 if err != nil {
324 log.Errorf("openStreams failed: %v",err)
325 return err
326 }
327 // If we get here, there has to be AT LEAST ONE open stream
328
329 // *Do not explicitly close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
330 // Channels do not have to be closed, it is just a control flow mechanism, see
331 // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
332
333 log.Debug("Starting server to client forwarding")
334 s2cErrChan := beStrms.forwardServerToClient(serverStream, nf)
335
336 log.Debug("Starting client to server forwarding")
337 c2sErrChan := beStrms.forwardClientToServer(serverStream, sf)
338
339 // We don't know which side is going to stop sending first, so we need a select between the two.
340 for i := 0; i < 2; i++ {
341 select {
342 case s2cErr := <-s2cErrChan:
343 log.Debug("Processing s2cErr")
344 if s2cErr == io.EOF {
345 log.Debug("s2cErr reporting EOF")
346 // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./
347 // the clientStream>serverStream may continue sending though.
348 beStrms.closeSend()
349 break
350 } else {
351 log.Debugf("s2cErr reporting %v",s2cErr)
352 // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
353 // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
354 // exit with an error to the stack
355 beStrms.clientCancel()
356 return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
357 }
358 case c2sErr := <-c2sErrChan:
359 log.Debug("Processing c2sErr")
360 // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
361 // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
362 // will be nil.
363 serverStream.SetTrailer(beStrms.trailer())
364 // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
365 if c2sErr != io.EOF {
366 return c2sErr
367 }
368 return nil
369 }
370 }
371 return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
372}
373
374func (strms *beClStrms) clientCancel() {
375 for _,strm := range strms.strms {
376 if strm != nil {
377 strm.cncl()
378 }
379 }
380}
381
382func (strms *beClStrms) closeSend() {
383 for _,strm := range strms.strms {
384 if strm != nil {
385 log.Debug("Closing southbound stream")
386 strm.strm.CloseSend()
387 }
388 }
389}
390
391func (strms *beClStrms) trailer() metadata.MD {
392 return strms.actvStrm.strm.Trailer()
393}
394
395func (bec *backendCluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) {
396 // Receive the first message from the server. This calls the assigned codec in which
397 // Unmarshal gets executed. That will use the assigned router to select a backend
398 // and add it to the frame
399 if err := src.RecvMsg(f); err != nil {
400 return nil, err
401 }
402 // Check that the backend was routable and actually has connections open.
403 // If it doesn't then return a nil backend to indicate this
404 if f.be == nil {
405 err := fmt.Errorf("Unable to route method '%s'", f.mthdSlice[REQ_METHOD])
406 log.Error(err)
407 return nil, err
408 } else if f.be.opnConns == 0 {
409 err := fmt.Errorf("No open connections on backend '%s'", f.be.name)
410 log.Error(err)
411 return f.be, err
412 }
413 return f.be, nil
414}
415
416func (strms * beClStrms) getActive() *beClStrm {
417 strms.lck.Lock()
418 defer strms.lck.Unlock()
419 return strms.actvStrm
420}
421
422func (strms *beClStrms) setThenGetActive(strm *beClStrm) (*beClStrm) {
423 strms.lck.Lock()
424 defer strms.lck.Unlock()
425 if strms.actvStrm == nil {
426 strms.actvStrm = strm
427 }
428 return strms.actvStrm
429}
430
431func (src *beClStrms) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error {
432 fc2s := func(srcS *beClStrm) {
433 for i := 0; ; i++ {
434 if err := srcS.strm.RecvMsg(f); err != nil {
435 if src.setThenGetActive(srcS) == srcS {
436 srcS.c2sRtrn <- err // this can be io.EOF which is the success case
437 } else {
438 srcS.c2sRtrn <- nil // Inactive responder
439 }
440 break
441 }
442 if src.setThenGetActive(srcS) != srcS {
443 srcS.c2sRtrn <- nil
444 break
445 }
446 if i == 0 {
447 // This is a bit of a hack, but client to server headers are only readable after first client msg is
448 // received but must be written to server stream before the first msg is flushed.
449 // This is the only place to do it nicely.
450 md, err := srcS.strm.Header()
451 if err != nil {
452 srcS.c2sRtrn <- err
453 break
454 }
455 // Update the metadata for the response.
456 if f.metaKey != NoMeta {
457 if f.metaVal == "" {
458 // We could also alsways just do this
459 md.Set(f.metaKey, f.be.name)
460 } else {
461 md.Set(f.metaKey, f.metaVal)
462 }
463 }
464 if err := dst.SendHeader(md); err != nil {
465 srcS.c2sRtrn <- err
466 break
467 }
468 }
469 log.Debugf("Northbound frame %v", f.payload)
470 if err := dst.SendMsg(f); err != nil {
471 srcS.c2sRtrn <- err
472 break
473 }
474 }
475 }
476
477 // There should be AT LEAST one open stream at this point
478 // if there isn't its a grave error in the code and it will
479 // cause this thread to block here so check for it and
480 // don't let the lock up happen but report the error
481 ret := make(chan error, 1)
482 agg := make(chan *beClStrm)
483 atLeastOne := false
484 for _,strm := range src.strms {
485 if strm != nil {
486 go fc2s(strm)
487 go func(s *beClStrm) { // Wait on result and aggregate
488 r := <-s.c2sRtrn // got the return code
489 if r == nil {
490 return // We're the redundat stream, just die
491 }
492 s.c2sRtrn <- r // put it back to pass it along
493 agg <- s // send the stream to the aggregator
494 } (strm)
495 atLeastOne = true
496 }
497 }
498 if atLeastOne == true {
499 go func() { // Wait on aggregated result
500 s := <-agg
501 ret <- <-s.c2sRtrn
502 }()
503 } else {
504 err := errors.New("There are no open streams. Unable to forward message.")
505 log.Error(err)
506 ret <- err
507 }
508 return ret
509}
510
511func (strms *beClStrms) sendAll(f *nbFrame) error {
512 var rtrn error
513
514 atLeastOne := false
sslobodr5f0b5a32019-01-24 07:45:19 -0500515 for _,strm := range strms.strms {
sslobodr392ebd52019-01-18 12:41:49 -0500516 if strm != nil {
517 if err := strm.strm.SendMsg(f); err != nil {
518 strm.s2cRtrn = err
519 }
520 atLeastOne = true
521 }
522 }
523 // If one of the streams succeeded, declare success
524 // if none did pick an error and return it.
525 if atLeastOne == true {
sslobodr5f0b5a32019-01-24 07:45:19 -0500526 for _,strm := range strms.strms {
sslobodr392ebd52019-01-18 12:41:49 -0500527 if strm != nil {
528 rtrn = strm.s2cRtrn
529 if rtrn == nil {
530 return rtrn
531 }
532 }
533 }
534 return rtrn
535 } else {
536 rtrn = errors.New("There are no open streams, this should never happen")
537 log.Error(rtrn)
538 }
539 return rtrn;
540}
541
542func (dst *beClStrms) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
543 ret := make(chan error, 1)
544 go func() {
545 // The frame buffer already has the results of a first
546 // RecvMsg in it so the first thing to do is to
547 // send it to the list of client streams and only
548 // then read some more.
549 for i := 0; ; i++ {
550 // Send the message to each of the backend streams
551 if err := dst.sendAll(f); err != nil {
552 ret <- err
553 break
554 }
555 log.Debugf("Southbound frame %v", f.payload)
556 if err := src.RecvMsg(f); err != nil {
557 ret <- err // this can be io.EOF which is happy case
558 break
559 }
560 }
561 }()
562 return ret
563}
564
565func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
566 var rtrn_err bool = false
567
568 log.Debugf("Configuring the backend with %v", *conf)
569 // Validate the conifg and configure the backend
570 be:=&backend{name:conf.Name,connections:make(map[string]*beConnection),opnConns:0}
571 idx := strIndex([]string(beTypeNames),conf.Type)
572 if idx == 0 {
573 log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
574 rtrn_err = true
575 }
576 be.beType = idx
577
578 idx = strIndex(asTypeNames, conf.Association.Strategy)
579 if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
580 log.Errorf("An association strategy must be provided if the backend "+
581 "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
582 rtrn_err = true
583 }
584 be.activeAssoc.strategy = idx
585
586 idx = strIndex(alTypeNames, conf.Association.Location)
587 if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
588 log.Errorf("An association location must be provided if the backend "+
589 "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
590 rtrn_err = true
591 }
592 be.activeAssoc.location = idx
593
594 if conf.Association.Field == "" && be.activeAssoc.location == AL_PROTOBUF {
595 log.Errorf("An association field must be provided if the backend "+
596 "type is active/active and the location is set to protobuf "+
597 "for backend %s in cluster %s", conf.Name, clusterName)
598 rtrn_err = true
599 }
600 be.activeAssoc.field = conf.Association.Field
601 if rtrn_err {
602 return nil, errors.New("Backend configuration failed")
603 }
604 // Configure the connections
605 // Connections can consist of just a name. This allows for dynamic configuration
606 // at a later time.
607 // TODO: validate that there is one connection for all but active/active backends
sslobodr5f0b5a32019-01-24 07:45:19 -0500608 for _,cnConf := range conf.Connections {
sslobodr392ebd52019-01-18 12:41:49 -0500609 if cnConf.Name == "" {
610 log.Errorf("A connection must have a name for backend %s in cluster %s",
611 conf.Name, clusterName)
612 } else {
613 gc:=&gConnection{conn:nil,cncl:nil,state:connectivity.Idle}
614 be.connections[cnConf.Name] = &beConnection{name:cnConf.Name,addr:cnConf.Addr,port:cnConf.Port,bknd:be,gConn:gc}
615 if cnConf.Addr != "" { // This connection will be specified later.
616 if ip := net.ParseIP(cnConf.Addr); ip == nil {
617 log.Errorf("The IP address for connection %s in backend %s in cluster %s is invalid",
618 cnConf.Name, conf.Name, clusterName)
619 rtrn_err = true
620 }
621 // Validate the port number. This just validtes that it's a non 0 integer
622 if n,err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
623 log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
624 cnConf.Port, cnConf.Name, conf.Name, clusterName)
625 rtrn_err = true
626 } else {
627 if n <=0 && n > 65535 {
628 log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
629 cnConf.Port, cnConf.Name, conf.Name, clusterName)
630 rtrn_err = true
631 }
632 }
633 }
634 }
635 }
636 if rtrn_err {
637 return nil, errors.New("Connection configuration failed")
638 }
639 // All is well start the backend cluster connections
640 be.connectAll()
641
642 return be, nil
643}
644
645//***************************************************************//
646//********************* Backend Functions ***********************//
647//***************************************************************//
648
649func (be *backend) incConn() {
650 be.lck.Lock()
651 defer be.lck.Unlock()
652 be.opnConns++
653}
654
655func (be *backend) decConn() {
656 be.lck.Lock()
657 defer be.lck.Unlock()
658 be.opnConns--
659 if be.opnConns < 0 {
660 log.Error("Internal error, number of open connections less than 0")
661 be.opnConns = 0
662 }
663}
664
665// Attempts to establish all the connections for a backend
666// any failures result in an abort. This should only be called
667// on a first attempt to connect. Individual connections should be
668// handled after that.
669func (be *backend) connectAll() {
670 for _,cn := range be.connections {
671 cn.connect()
672 }
673}
674
675func (cn *beConnection) connect() {
676 if cn.addr != "" && cn.getConn() == nil {
677 log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name,cn.addr,cn.port)
678 // Dial doesn't block, it just returns and continues connecting in the background.
679 // Check back later to confirm and increase the connection count.
680 ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection
681 cn.setCncl(cnclFnc)
682 if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil {
683 log.Errorf("Dialng connection %v:%v",cn,err)
684 cn.waitAndTryAgain(ctx)
685 } else {
686 cn.setConn(conn)
687 log.Debugf("Starting the connection monitor for '%s'", cn.name)
688 cn.monitor(ctx)
689 }
690 } else if cn.addr == "" {
691 log.Infof("No address supplied for connection '%s', not connecting for now", cn.name)
692 } else {
693 log.Debugf("Connection '%s' is already connected, ignoring", cn.name)
694 }
695}
696
697func (cn *beConnection) waitAndTryAgain(ctx context.Context) {
698 go func(ctx context.Context) {
699 ctxTm,cnclTm := context.WithTimeout(context.Background(), 10 * time.Second)
700 select {
701 case <-ctxTm.Done():
702 cnclTm()
703 log.Debugf("Trying to connect '%s'",cn.name)
704 // Connect creates a new context so cancel this one.
705 cn.cancel()
706 cn.connect()
707 return
708 case <-ctx.Done():
709 cnclTm()
710 return
711 }
712 }(ctx)
713}
714
715func (cn *beConnection) cancel() {
716 cn.lck.Lock()
717 defer cn.lck.Unlock()
718 log.Debugf("Canceling connection %s", cn.name)
719 if cn.gConn != nil{
720 if cn.gConn.cncl != nil {
721 cn.cncl()
722 } else {
723 log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
724 }
725 } else {
726 log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name)
727 }
728}
729
730func (cn *beConnection) setCncl(cncl context.CancelFunc) {
731 cn.lck.Lock()
732 defer cn.lck.Unlock()
733 if cn.gConn != nil {
734 cn.gConn.cncl = cncl
735 } else {
736 log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name)
737 }
738}
739
740func (cn *beConnection) setConn(conn *grpc.ClientConn) {
741 cn.lck.Lock()
742 defer cn.lck.Unlock()
743 if cn.gConn != nil {
744 cn.gConn.conn = conn
745 } else {
746 log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name)
747 }
748}
749
750func (cn *beConnection) getConn() *grpc.ClientConn {
751 cn.lck.Lock()
752 defer cn.lck.Unlock()
753 if cn.gConn != nil {
754 return cn.gConn.conn
755 }
756 return nil
757}
758
759func (cn *beConnection) close() {
760 cn.lck.Lock()
761 defer cn.lck.Unlock()
762 log.Debugf("Closing connection %s", cn.name)
763 if cn.gConn != nil && cn.gConn.conn != nil {
764 if cn.gConn.conn.GetState() == connectivity.Ready {
765 cn.bknd.decConn() // Decrease the connection reference
766 }
767 if cn.gConn.cncl != nil {
768 cn.gConn.cncl() // Cancel the context first to force monitor functions to exit
769 } else {
770 log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
771 }
772 cn.gConn.conn.Close() // Close the connection
773 // Now replace the gConn object with a new one as this one just
774 // fades away as references to it are released after the close
775 // finishes in the background.
776 cn.gConn = &gConnection{conn:nil,cncl:nil,state:connectivity.TransientFailure}
777 } else {
778 log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name)
779 }
780
781}
782
783func (cn *beConnection) setState(st connectivity.State) {
784 cn.lck.Lock()
785 defer cn.lck.Unlock()
786 if cn.gConn != nil {
787 cn.gConn.state = st
788 } else {
789 log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name)
790 }
791}
792
793func (cn *beConnection) getState() (connectivity.State) {
794 cn.lck.Lock()
795 defer cn.lck.Unlock()
796 if cn.gConn != nil {
797 if cn.gConn.conn != nil {
798 return cn.gConn.conn.GetState()
799 } else {
800 log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name)
801 }
802 } else {
803 log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name)
804 }
805 // For lack of a better state to use. The logs will help determine what happened here.
806 return connectivity.TransientFailure
807}
808
809
810func (cn *beConnection) monitor(ctx context.Context) {
811 bp := cn.bknd
812 log.Debugf("Setting up monitoring for backend %s", bp.name)
813 go func(ctx context.Context) {
814 var delay time.Duration = 100 //ms
815 for {
816 //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, bp.name, cn.conn)
817 if cn.getState() == connectivity.Ready {
818 log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, bp.name)
819 cn.setState(connectivity.Ready)
820 bp.incConn()
821 if cn.getConn() != nil && cn.getConn().WaitForStateChange(ctx, connectivity.Ready) == false {
822 // The context was canceled. This is done by the close function
823 // so just exit the routine
824 log.Debugf("Contxt canceled for connection '%s' on backend '%s'",cn.name, bp.name)
825 return
826 }
827 if cs := cn.getConn(); cs != nil {
828 switch cs := cn.getState(); cs {
829 case connectivity.TransientFailure:
830 cn.setState(cs)
831 bp.decConn()
832 log.Infof("Transient failure for connection '%s' on backend '%s'",cn.name, bp.name)
833 delay = 100
834 case connectivity.Shutdown:
835 //The connection was closed. The assumption here is that the closer
836 // will manage the connection count and setting the conn to nil.
837 // Exit the routine
838 log.Infof("Shutdown for connection '%s' on backend '%s'",cn.name, bp.name)
839 return
840 case connectivity.Idle:
841 // This can only happen if the server sends a GoAway. This can
842 // only happen if the server has modified MaxConnectionIdle from
843 // its default of infinity. The only solution here is to close the
844 // connection and keepTrying()?
845 //TODO: Read the grpc source code to see if there's a different approach
846 log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'",cn.name, bp.name)
847 cn.close()
848 cn.connect()
849 return
850 }
851 } else { // A nil means something went horribly wrong, error and exit.
852 log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s",cn.name)
853 return
854 }
855 } else {
856 log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, bp.name)
857 ctxTm, cnclTm := context.WithTimeout(context.Background(), delay * time.Millisecond)
858 if delay < 30000 {
859 delay += delay
860 }
861 select {
862 case <-ctxTm.Done():
863 cnclTm() // Doubt this is required but it's harmless.
864 // Do nothing but let the loop continue
865 case <-ctx.Done():
866 // Context was closed, close and exit routine
867 //cn.close() NO! let the close be managed externally!
868 return
869 }
870 }
871 }
872 }(ctx)
873}
874
875// Set a callback for connection failure notification
876// This is currently not used.
877func (bp * backend) setConnFailCallback(cb func(string, *backend)bool) {
878 bp.connFailCallback = cb
879}
880