blob: 3f17af1c370508fe60e9aa8ba3994ca2fb67fcc4 [file] [log] [blame]
sslobodr392ebd52019-01-18 12:41:49 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16// gRPC affinity router with active/active backends
17
18package afrouter
19
20// Backend manager handles redundant connections per backend
21
22import (
23 "io"
24 "fmt"
25 "net"
26 "sync"
27 "time"
sslobodr63d160c2019-02-08 14:25:13 -050028 "sort"
sslobodr392ebd52019-01-18 12:41:49 -050029 "errors"
30 "strconv"
31 "strings"
32 "golang.org/x/net/context"
33 "google.golang.org/grpc"
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/metadata"
36 "google.golang.org/grpc/connectivity"
37 "github.com/opencord/voltha-go/common/log"
38)
39
40
41
42const (
43 BE_ACTIVE_ACTIVE = 1 // Backend type active/active
44 BE_SERVER = 2 // Backend type single server
45 BE_SEQ_RR = 0 // Backend sequence round robin
46 AS_NONE = 0 // Association strategy: none
47 AS_SERIAL_NO = 1 // Association strategy: serial number
48 AL_NONE = 0 // Association location: none
49 AL_HEADER = 1 // Association location: header
50 AL_PROTOBUF = 2 // Association location: protobuf
51)
52
53
54var beTypeNames = []string{"","active_active","server"}
55var asTypeNames = []string{"","serial_number"}
56var alTypeNames = []string{"","header","protobuf"}
57
58var bClusters map[string]*backendCluster = make(map[string]*backendCluster)
59
60type backendCluster struct {
61 name string
62 //backends map[string]*backend
63 backends []*backend
64 beRvMap map[*backend]int
65 serialNoSource chan uint64
66}
67
68type backend struct {
69 lck sync.Mutex
70 name string
71 beType int
72 activeAssoc assoc
73 connFailCallback func(string, *backend)bool
74 connections map[string]*beConnection
sslobodr63d160c2019-02-08 14:25:13 -050075 srtdConns []*beConnection
sslobodr392ebd52019-01-18 12:41:49 -050076 opnConns int
77}
78
79type assoc struct {
80 strategy int
81 location int
82 field string // Used only if location is protobuf
sslobodr8e2ccb52019-02-05 09:21:47 -050083 key string
sslobodr392ebd52019-01-18 12:41:49 -050084}
85
86type beConnection struct {
87 lck sync.Mutex
88 cncl context.CancelFunc
89 name string
90 addr string
91 port string
92 gConn *gConnection
93 bknd *backend
94}
95
96// This structure should never be referred to
97// by any routine outside of *beConnection
98// routines.
99type gConnection struct {
100 lck sync.Mutex
101 state connectivity.State
102 conn *grpc.ClientConn
103 cncl context.CancelFunc
104}
105
106type beClStrm struct {
107 strm grpc.ClientStream
108 ctxt context.Context
109 cncl context.CancelFunc
sslobodr63d160c2019-02-08 14:25:13 -0500110 ok2Close chan struct{}
sslobodr392ebd52019-01-18 12:41:49 -0500111 c2sRtrn chan error
112 s2cRtrn error
113}
114
115type beClStrms struct {
116 lck sync.Mutex
117 actvStrm *beClStrm
118 strms map[string]*beClStrm
sslobodr63d160c2019-02-08 14:25:13 -0500119 srtdStrms []*beClStrm
sslobodr392ebd52019-01-18 12:41:49 -0500120}
121
122//***************************************************************//
123//****************** BackendCluster Functions *******************//
124//***************************************************************//
125
126//TODO: Move the backend type (active/active etc) to the cluster
127// level. All backends should really be of the same type.
128// Create a new backend cluster
sslobodrcd37bc52019-01-24 11:47:16 -0500129func newBackendCluster(conf *BackendClusterConfig) (*backendCluster, error) {
sslobodr392ebd52019-01-18 12:41:49 -0500130 var err error = nil
131 var rtrn_err bool = false
132 var be *backend
sslobodr8e2ccb52019-02-05 09:21:47 -0500133 log.Debugf("Creating a backend cluster with %v", conf)
sslobodr392ebd52019-01-18 12:41:49 -0500134 // Validate the configuration
135 if conf.Name == "" {
136 log.Error("A backend cluster must have a name")
137 rtrn_err = true
138 }
139 //bc := &backendCluster{name:conf.Name,backends:make(map[string]*backend)}
140 bc := &backendCluster{name:conf.Name, beRvMap:make(map[*backend]int)}
141 bClusters[bc.name] = bc
142 bc.startSerialNumberSource() // Serial numberere for active/active backends
143 idx := 0
sslobodr5f0b5a32019-01-24 07:45:19 -0500144 for _, bec := range conf.Backends {
sslobodr392ebd52019-01-18 12:41:49 -0500145 if bec.Name == "" {
146 log.Errorf("A backend must have a name in cluster %s\n", conf.Name)
147 rtrn_err = true
148 }
149 if be,err = newBackend(&bec, conf.Name); err != nil {
150 log.Errorf("Error creating backend %s", bec.Name)
151 rtrn_err = true
152 }
153 bc.backends = append(bc.backends, be)
154 bc.beRvMap[bc.backends[idx]] = idx
155 idx++
156 }
157 if rtrn_err {
158 return nil, errors.New("Error creating backend(s)")
159 }
160 return bc, nil
161}
162
163func (bc * backendCluster) getBackend(name string) *backend {
164 for _,v := range bc.backends {
165 if v.name == name {
166 return v
167 }
168 }
169 return nil
170}
171
172func (bc *backendCluster) startSerialNumberSource() {
173 bc.serialNoSource = make(chan uint64)
174 var counter uint64 = 0
175 // This go routine only dies on exit, it is not a leak
176 go func() {
177 for {
178 bc.serialNoSource <- counter
179 counter++
180 }
181 }()
182}
183
184func (bc *backendCluster) nextBackend(be *backend, seq int) (*backend,error) {
185 switch seq {
186 case BE_SEQ_RR: // Round robin
187 in := be
188 // If no backend is found having a connection
189 // then return nil.
190 if be == nil {
191 log.Debug("Previous backend is nil")
192 be = bc.backends[0]
193 in = be
194 if be.opnConns != 0 {
195 return be,nil
196 }
197 }
198 for {
199 log.Debugf("Requesting a new backend starting from %s", be.name)
200 cur := bc.beRvMap[be]
201 cur++
202 if cur >= len(bc.backends) {
203 cur = 0
204 }
205 log.Debugf("Next backend is %d:%s", cur, bc.backends[cur].name)
206 if bc.backends[cur].opnConns > 0 {
207 return bc.backends[cur], nil
208 }
209 if bc.backends[cur] == in {
210 err := fmt.Errorf("No backend with open connections found")
211 log.Debug(err);
212 return nil,err
213 }
214 be = bc.backends[cur]
215 log.Debugf("Backend '%s' has no open connections, trying next", bc.backends[cur].name)
216 }
217 default: // Invalid, defalt to routnd robin
218 log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq)
219 return bc.nextBackend(be, BE_SEQ_RR)
220 }
221}
222
223func (bec *backendCluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, mthdSlice []string,
224 mk string, mv string) error {
225//func (bec *backendCluster) handler(nbR * nbRequest) error {
226
227 // The final backend cluster needs to be determined here. With non-affinity routed backends it could
228 // just be determined here and for affinity routed backends the first message must be received
229 // before the backend is determined. In order to keep things simple, the same approach is taken for
230 // now.
231
232 // Get the backend to use.
233 // Allocate the nbFrame here since it holds the "context" of this communication
234 nf := &nbFrame{router:r, mthdSlice:mthdSlice, serNo:bec.serialNoSource, metaKey:mk, metaVal:mv}
235 log.Debugf("Nb frame allocate with method %s", nf.mthdSlice[REQ_METHOD])
236
237 if be,err := bec.assignBackend(serverStream, nf); err != nil {
238 // At this point, no backend streams have been initiated
239 // so just return the error.
240 return err
241 } else {
242 log.Debugf("Backend '%s' selected", be.name)
243 // Allocate a sbFrame here because it might be needed for return value intercept
244 sf := &sbFrame{router:r, be:be, method:nf.mthdSlice[REQ_METHOD], metaKey:mk, metaVal:mv}
245 log.Debugf("Sb frame allocated with router %s",r.Name())
246 return be.handler(srv, serverStream, nf, sf)
247 }
248}
249
250func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f * nbFrame) (*beClStrms, error) {
251
252 rtrn := &beClStrms{strms:make(map[string]*beClStrm),actvStrm:nil}
253
254 log.Debugf("Opening southbound streams for method '%s'", f.mthdSlice[REQ_METHOD])
255 // Get the metadata from the incoming message on the server
256 md, ok := metadata.FromIncomingContext(serverStream.Context())
257 if !ok {
258 return nil, errors.New("Could not get a server stream metadata")
259 }
260
261 // TODO: Need to check if this is an active/active backend cluster
262 // with a serial number in the header.
263 serialNo := <-f.serNo
264 log.Debugf("Serial number for transaction allocated: %d", serialNo)
265 // If even one stream can be created then proceed. If none can be
266 // created then report an error becase both the primary and redundant
267 // connections are non-existant.
268 var atLeastOne bool = false
269 var errStr strings.Builder
270 log.Debugf("There are %d connections to open", len(be.connections))
sslobodr63d160c2019-02-08 14:25:13 -0500271 for _,cn := range be.srtdConns {
sslobodr392ebd52019-01-18 12:41:49 -0500272 // TODO: THIS IS A HACK to suspend redundancy for binding routers for all calls
273 // and its very specific to a use case. There should really be a per method
274 // mechanism to select non-redundant calls for all router types. This needs
275 // to be fixed ASAP. The overrides should be used for this, the implementation
276 // is simple, and it can be done here.
277 if atLeastOne == true && f.metaKey != NoMeta {
278 // Don't open any more southbound streams
279 log.Debugf("Not opening any more SB streams, metaKey = %s", f.metaKey)
sslobodr63d160c2019-02-08 14:25:13 -0500280 rtrn.strms[cn.name] = nil
sslobodr392ebd52019-01-18 12:41:49 -0500281 continue
282 }
283 // Copy in the metadata
284 if cn.getState() == connectivity.Ready && cn.getConn() != nil {
sslobodr63d160c2019-02-08 14:25:13 -0500285 log.Debugf("Opening southbound stream for connection '%s'", cn.name)
sslobodr392ebd52019-01-18 12:41:49 -0500286 // Create an outgoing context that includes the incoming metadata
287 // and that will cancel if the server's context is canceled
288 clientCtx, clientCancel := context.WithCancel(serverStream.Context())
289 clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy())
290 //TODO: Same check here, only add the serial number if necessary
291 clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number",
292 strconv.FormatUint(serialNo,10))
293 // Create the client stream
294 if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
295 cn.getConn(), f.mthdSlice[REQ_ALL]); err !=nil {
sslobodr63d160c2019-02-08 14:25:13 -0500296 log.Debugf("Failed to create a client stream '%s', %v",cn.name,err)
sslobodr392ebd52019-01-18 12:41:49 -0500297 fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
sslobodr63d160c2019-02-08 14:25:13 -0500298 rtrn.strms[cn.name] = nil
sslobodr392ebd52019-01-18 12:41:49 -0500299 } else {
sslobodr63d160c2019-02-08 14:25:13 -0500300 rtrn.strms[cn.name] = &beClStrm{strm:clientStream, ctxt:clientCtx,
301 cncl:clientCancel, s2cRtrn:nil,
302 ok2Close:make(chan struct{}),
303 c2sRtrn:make(chan error, 1)}
sslobodr392ebd52019-01-18 12:41:49 -0500304 atLeastOne = true
305 }
306 } else if cn.getConn() == nil {
307 err := errors.New(fmt.Sprintf("Connection '%s' is closed", cn.name))
308 fmt.Fprint(&errStr, err.Error())
309 log.Debug(err)
310 } else {
311 err := errors.New(fmt.Sprintf("Connection '%s' isn't ready", cn.name))
312 fmt.Fprint(&errStr, err.Error())
313 log.Debug(err)
314 }
315 }
316 if atLeastOne == true {
sslobodr63d160c2019-02-08 14:25:13 -0500317 rtrn.sortStreams()
sslobodr392ebd52019-01-18 12:41:49 -0500318 return rtrn,nil
319 }
320 fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ",be.name)
321 log.Error(errStr.String())
322 return nil, errors.New(errStr.String())
323}
324
325func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf * nbFrame, sf * sbFrame) error {
326
327 // Set up and launch each individual southbound stream
328 var beStrms *beClStrms
sslobodr63d160c2019-02-08 14:25:13 -0500329 var rtrn error = nil
330 var s2cOk bool = false
331 var c2sOk bool = false
sslobodr392ebd52019-01-18 12:41:49 -0500332
333 beStrms, err := be.openSouthboundStreams(srv,serverStream,nf)
334 if err != nil {
335 log.Errorf("openStreams failed: %v",err)
336 return err
337 }
338 // If we get here, there has to be AT LEAST ONE open stream
339
340 // *Do not explicitly close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
341 // Channels do not have to be closed, it is just a control flow mechanism, see
342 // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
343
344 log.Debug("Starting server to client forwarding")
345 s2cErrChan := beStrms.forwardServerToClient(serverStream, nf)
346
347 log.Debug("Starting client to server forwarding")
348 c2sErrChan := beStrms.forwardClientToServer(serverStream, sf)
349
350 // We don't know which side is going to stop sending first, so we need a select between the two.
351 for i := 0; i < 2; i++ {
352 select {
353 case s2cErr := <-s2cErrChan:
sslobodr63d160c2019-02-08 14:25:13 -0500354 s2cOk = true
sslobodr392ebd52019-01-18 12:41:49 -0500355 log.Debug("Processing s2cErr")
356 if s2cErr == io.EOF {
357 log.Debug("s2cErr reporting EOF")
358 // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./
359 // the clientStream>serverStream may continue sending though.
sslobodr63d160c2019-02-08 14:25:13 -0500360 defer beStrms.closeSend()
361 if c2sOk == true {
362 return rtrn
363 }
sslobodr392ebd52019-01-18 12:41:49 -0500364 } else {
365 log.Debugf("s2cErr reporting %v",s2cErr)
366 // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
367 // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
368 // exit with an error to the stack
369 beStrms.clientCancel()
370 return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
371 }
372 case c2sErr := <-c2sErrChan:
sslobodr63d160c2019-02-08 14:25:13 -0500373 c2sOk = true
sslobodr392ebd52019-01-18 12:41:49 -0500374 log.Debug("Processing c2sErr")
375 // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
376 // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
377 // will be nil.
378 serverStream.SetTrailer(beStrms.trailer())
379 // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
sslobodr63d160c2019-02-08 14:25:13 -0500380 // NOTE!!! with redundant backends, it's likely that one of the backends
381 // returns a response before all the data has been sent southbound and
382 // the southbound streams are closed. Should this happen one of the
383 // backends may not get the request.
sslobodr392ebd52019-01-18 12:41:49 -0500384 if c2sErr != io.EOF {
sslobodr63d160c2019-02-08 14:25:13 -0500385 rtrn = c2sErr
sslobodr392ebd52019-01-18 12:41:49 -0500386 }
sslobodr63d160c2019-02-08 14:25:13 -0500387 log.Debug("c2sErr reporting EOF")
388 if s2cOk == true {
389 return rtrn
390 }
sslobodr392ebd52019-01-18 12:41:49 -0500391 }
392 }
393 return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
394}
395
396func (strms *beClStrms) clientCancel() {
397 for _,strm := range strms.strms {
398 if strm != nil {
399 strm.cncl()
400 }
401 }
402}
403
404func (strms *beClStrms) closeSend() {
405 for _,strm := range strms.strms {
406 if strm != nil {
sslobodr63d160c2019-02-08 14:25:13 -0500407 <-strm.ok2Close
sslobodr392ebd52019-01-18 12:41:49 -0500408 log.Debug("Closing southbound stream")
409 strm.strm.CloseSend()
410 }
411 }
412}
413
414func (strms *beClStrms) trailer() metadata.MD {
415 return strms.actvStrm.strm.Trailer()
416}
417
418func (bec *backendCluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) {
419 // Receive the first message from the server. This calls the assigned codec in which
420 // Unmarshal gets executed. That will use the assigned router to select a backend
421 // and add it to the frame
422 if err := src.RecvMsg(f); err != nil {
423 return nil, err
424 }
425 // Check that the backend was routable and actually has connections open.
426 // If it doesn't then return a nil backend to indicate this
427 if f.be == nil {
428 err := fmt.Errorf("Unable to route method '%s'", f.mthdSlice[REQ_METHOD])
429 log.Error(err)
430 return nil, err
431 } else if f.be.opnConns == 0 {
432 err := fmt.Errorf("No open connections on backend '%s'", f.be.name)
433 log.Error(err)
434 return f.be, err
435 }
436 return f.be, nil
437}
438
439func (strms * beClStrms) getActive() *beClStrm {
440 strms.lck.Lock()
441 defer strms.lck.Unlock()
442 return strms.actvStrm
443}
444
445func (strms *beClStrms) setThenGetActive(strm *beClStrm) (*beClStrm) {
446 strms.lck.Lock()
447 defer strms.lck.Unlock()
448 if strms.actvStrm == nil {
449 strms.actvStrm = strm
450 }
451 return strms.actvStrm
452}
453
454func (src *beClStrms) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error {
455 fc2s := func(srcS *beClStrm) {
456 for i := 0; ; i++ {
457 if err := srcS.strm.RecvMsg(f); err != nil {
458 if src.setThenGetActive(srcS) == srcS {
459 srcS.c2sRtrn <- err // this can be io.EOF which is the success case
460 } else {
461 srcS.c2sRtrn <- nil // Inactive responder
462 }
sslobodr63d160c2019-02-08 14:25:13 -0500463 close(srcS.ok2Close)
sslobodr392ebd52019-01-18 12:41:49 -0500464 break
465 }
466 if src.setThenGetActive(srcS) != srcS {
467 srcS.c2sRtrn <- nil
sslobodr63d160c2019-02-08 14:25:13 -0500468 continue
sslobodr392ebd52019-01-18 12:41:49 -0500469 }
470 if i == 0 {
471 // This is a bit of a hack, but client to server headers are only readable after first client msg is
472 // received but must be written to server stream before the first msg is flushed.
473 // This is the only place to do it nicely.
474 md, err := srcS.strm.Header()
475 if err != nil {
476 srcS.c2sRtrn <- err
477 break
478 }
479 // Update the metadata for the response.
480 if f.metaKey != NoMeta {
481 if f.metaVal == "" {
482 // We could also alsways just do this
483 md.Set(f.metaKey, f.be.name)
484 } else {
485 md.Set(f.metaKey, f.metaVal)
486 }
487 }
488 if err := dst.SendHeader(md); err != nil {
489 srcS.c2sRtrn <- err
490 break
491 }
492 }
493 log.Debugf("Northbound frame %v", f.payload)
494 if err := dst.SendMsg(f); err != nil {
495 srcS.c2sRtrn <- err
496 break
497 }
498 }
499 }
500
501 // There should be AT LEAST one open stream at this point
502 // if there isn't its a grave error in the code and it will
503 // cause this thread to block here so check for it and
504 // don't let the lock up happen but report the error
505 ret := make(chan error, 1)
506 agg := make(chan *beClStrm)
507 atLeastOne := false
508 for _,strm := range src.strms {
509 if strm != nil {
510 go fc2s(strm)
511 go func(s *beClStrm) { // Wait on result and aggregate
512 r := <-s.c2sRtrn // got the return code
513 if r == nil {
514 return // We're the redundat stream, just die
515 }
516 s.c2sRtrn <- r // put it back to pass it along
517 agg <- s // send the stream to the aggregator
518 } (strm)
519 atLeastOne = true
520 }
521 }
522 if atLeastOne == true {
523 go func() { // Wait on aggregated result
524 s := <-agg
525 ret <- <-s.c2sRtrn
526 }()
527 } else {
528 err := errors.New("There are no open streams. Unable to forward message.")
529 log.Error(err)
530 ret <- err
531 }
532 return ret
533}
534
535func (strms *beClStrms) sendAll(f *nbFrame) error {
536 var rtrn error
537
538 atLeastOne := false
sslobodr63d160c2019-02-08 14:25:13 -0500539 for _,strm := range strms.srtdStrms {
sslobodr392ebd52019-01-18 12:41:49 -0500540 if strm != nil {
541 if err := strm.strm.SendMsg(f); err != nil {
sslobodr63d160c2019-02-08 14:25:13 -0500542 log.Debugf("Error on SendMsg: %s", err.Error())
sslobodr392ebd52019-01-18 12:41:49 -0500543 strm.s2cRtrn = err
544 }
545 atLeastOne = true
sslobodr63d160c2019-02-08 14:25:13 -0500546 } else {
547 log.Debugf("Nil stream")
sslobodr392ebd52019-01-18 12:41:49 -0500548 }
549 }
550 // If one of the streams succeeded, declare success
551 // if none did pick an error and return it.
552 if atLeastOne == true {
sslobodr63d160c2019-02-08 14:25:13 -0500553 for _,strm := range strms.srtdStrms {
sslobodr392ebd52019-01-18 12:41:49 -0500554 if strm != nil {
555 rtrn = strm.s2cRtrn
556 if rtrn == nil {
557 return rtrn
558 }
559 }
560 }
561 return rtrn
562 } else {
563 rtrn = errors.New("There are no open streams, this should never happen")
564 log.Error(rtrn)
565 }
566 return rtrn;
567}
568
569func (dst *beClStrms) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
570 ret := make(chan error, 1)
571 go func() {
572 // The frame buffer already has the results of a first
573 // RecvMsg in it so the first thing to do is to
574 // send it to the list of client streams and only
575 // then read some more.
576 for i := 0; ; i++ {
577 // Send the message to each of the backend streams
578 if err := dst.sendAll(f); err != nil {
579 ret <- err
sslobodr63d160c2019-02-08 14:25:13 -0500580 log.Debugf("SendAll failed %s", err.Error())
sslobodr392ebd52019-01-18 12:41:49 -0500581 break
582 }
583 log.Debugf("Southbound frame %v", f.payload)
584 if err := src.RecvMsg(f); err != nil {
585 ret <- err // this can be io.EOF which is happy case
586 break
587 }
588 }
589 }()
590 return ret
591}
592
sslobodr63d160c2019-02-08 14:25:13 -0500593func (st * beClStrms) sortStreams() {
594 var tmpKeys []string
595 for k,_ := range st.strms {
596 tmpKeys = append(tmpKeys, k)
597 }
598 sort.Strings(tmpKeys)
599 for _,v := range tmpKeys {
600 st.srtdStrms = append(st.srtdStrms, st.strms[v])
601 }
602}
603
604func (be * backend) sortConns() {
605 var tmpKeys []string
606 for k,_ := range be.connections {
607 tmpKeys = append(tmpKeys, k)
608 }
609 sort.Strings(tmpKeys)
610 for _,v := range tmpKeys {
611 be.srtdConns = append(be.srtdConns, be.connections[v])
612 }
613}
614
sslobodr392ebd52019-01-18 12:41:49 -0500615func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
616 var rtrn_err bool = false
617
618 log.Debugf("Configuring the backend with %v", *conf)
619 // Validate the conifg and configure the backend
620 be:=&backend{name:conf.Name,connections:make(map[string]*beConnection),opnConns:0}
621 idx := strIndex([]string(beTypeNames),conf.Type)
622 if idx == 0 {
623 log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
624 rtrn_err = true
625 }
626 be.beType = idx
627
628 idx = strIndex(asTypeNames, conf.Association.Strategy)
629 if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
630 log.Errorf("An association strategy must be provided if the backend "+
631 "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
632 rtrn_err = true
633 }
634 be.activeAssoc.strategy = idx
635
636 idx = strIndex(alTypeNames, conf.Association.Location)
637 if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
638 log.Errorf("An association location must be provided if the backend "+
639 "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
640 rtrn_err = true
641 }
642 be.activeAssoc.location = idx
643
644 if conf.Association.Field == "" && be.activeAssoc.location == AL_PROTOBUF {
645 log.Errorf("An association field must be provided if the backend "+
646 "type is active/active and the location is set to protobuf "+
647 "for backend %s in cluster %s", conf.Name, clusterName)
648 rtrn_err = true
649 }
650 be.activeAssoc.field = conf.Association.Field
sslobodr8e2ccb52019-02-05 09:21:47 -0500651
652 if conf.Association.Key == "" && be.activeAssoc.location == AL_HEADER {
653 log.Errorf("An association key must be provided if the backend "+
654 "type is active/active and the location is set to header "+
655 "for backend %s in cluster %s", conf.Name, clusterName)
656 rtrn_err = true
657 }
658 be.activeAssoc.key = conf.Association.Key
sslobodr392ebd52019-01-18 12:41:49 -0500659 if rtrn_err {
660 return nil, errors.New("Backend configuration failed")
661 }
662 // Configure the connections
663 // Connections can consist of just a name. This allows for dynamic configuration
664 // at a later time.
665 // TODO: validate that there is one connection for all but active/active backends
sslobodr8e2ccb52019-02-05 09:21:47 -0500666 if len(conf.Connections) > 1 && be.activeAssoc.strategy != BE_ACTIVE_ACTIVE {
667 log.Errorf("Only one connection must be specified if the association "+
668 "strategy is not set to 'active_active'")
669 rtrn_err = true
670 }
671 if len(conf.Connections) == 0 {
672 log.Errorf("At least one connection must be specified")
673 rtrn_err = true
674 }
sslobodr5f0b5a32019-01-24 07:45:19 -0500675 for _,cnConf := range conf.Connections {
sslobodr392ebd52019-01-18 12:41:49 -0500676 if cnConf.Name == "" {
677 log.Errorf("A connection must have a name for backend %s in cluster %s",
678 conf.Name, clusterName)
679 } else {
680 gc:=&gConnection{conn:nil,cncl:nil,state:connectivity.Idle}
681 be.connections[cnConf.Name] = &beConnection{name:cnConf.Name,addr:cnConf.Addr,port:cnConf.Port,bknd:be,gConn:gc}
682 if cnConf.Addr != "" { // This connection will be specified later.
683 if ip := net.ParseIP(cnConf.Addr); ip == nil {
684 log.Errorf("The IP address for connection %s in backend %s in cluster %s is invalid",
685 cnConf.Name, conf.Name, clusterName)
686 rtrn_err = true
687 }
688 // Validate the port number. This just validtes that it's a non 0 integer
689 if n,err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
690 log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
691 cnConf.Port, cnConf.Name, conf.Name, clusterName)
692 rtrn_err = true
693 } else {
694 if n <=0 && n > 65535 {
695 log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
696 cnConf.Port, cnConf.Name, conf.Name, clusterName)
697 rtrn_err = true
698 }
699 }
700 }
701 }
702 }
sslobodr63d160c2019-02-08 14:25:13 -0500703
sslobodr392ebd52019-01-18 12:41:49 -0500704 if rtrn_err {
705 return nil, errors.New("Connection configuration failed")
706 }
sslobodr63d160c2019-02-08 14:25:13 -0500707 // Create the sorted connection list for deterministic
708 // active-active call orders.
709 be.sortConns()
sslobodr392ebd52019-01-18 12:41:49 -0500710 // All is well start the backend cluster connections
711 be.connectAll()
712
713 return be, nil
714}
715
716//***************************************************************//
717//********************* Backend Functions ***********************//
718//***************************************************************//
719
720func (be *backend) incConn() {
721 be.lck.Lock()
722 defer be.lck.Unlock()
723 be.opnConns++
724}
725
726func (be *backend) decConn() {
727 be.lck.Lock()
728 defer be.lck.Unlock()
729 be.opnConns--
730 if be.opnConns < 0 {
731 log.Error("Internal error, number of open connections less than 0")
732 be.opnConns = 0
733 }
734}
735
736// Attempts to establish all the connections for a backend
737// any failures result in an abort. This should only be called
738// on a first attempt to connect. Individual connections should be
739// handled after that.
740func (be *backend) connectAll() {
741 for _,cn := range be.connections {
742 cn.connect()
743 }
744}
745
746func (cn *beConnection) connect() {
747 if cn.addr != "" && cn.getConn() == nil {
748 log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name,cn.addr,cn.port)
749 // Dial doesn't block, it just returns and continues connecting in the background.
750 // Check back later to confirm and increase the connection count.
751 ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection
752 cn.setCncl(cnclFnc)
753 if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil {
754 log.Errorf("Dialng connection %v:%v",cn,err)
755 cn.waitAndTryAgain(ctx)
756 } else {
757 cn.setConn(conn)
758 log.Debugf("Starting the connection monitor for '%s'", cn.name)
759 cn.monitor(ctx)
760 }
761 } else if cn.addr == "" {
762 log.Infof("No address supplied for connection '%s', not connecting for now", cn.name)
763 } else {
764 log.Debugf("Connection '%s' is already connected, ignoring", cn.name)
765 }
766}
767
768func (cn *beConnection) waitAndTryAgain(ctx context.Context) {
769 go func(ctx context.Context) {
770 ctxTm,cnclTm := context.WithTimeout(context.Background(), 10 * time.Second)
771 select {
772 case <-ctxTm.Done():
773 cnclTm()
774 log.Debugf("Trying to connect '%s'",cn.name)
775 // Connect creates a new context so cancel this one.
776 cn.cancel()
777 cn.connect()
778 return
779 case <-ctx.Done():
780 cnclTm()
781 return
782 }
783 }(ctx)
784}
785
786func (cn *beConnection) cancel() {
787 cn.lck.Lock()
788 defer cn.lck.Unlock()
789 log.Debugf("Canceling connection %s", cn.name)
790 if cn.gConn != nil{
791 if cn.gConn.cncl != nil {
792 cn.cncl()
793 } else {
794 log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
795 }
796 } else {
797 log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name)
798 }
799}
800
801func (cn *beConnection) setCncl(cncl context.CancelFunc) {
802 cn.lck.Lock()
803 defer cn.lck.Unlock()
804 if cn.gConn != nil {
805 cn.gConn.cncl = cncl
806 } else {
807 log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name)
808 }
809}
810
811func (cn *beConnection) setConn(conn *grpc.ClientConn) {
812 cn.lck.Lock()
813 defer cn.lck.Unlock()
814 if cn.gConn != nil {
815 cn.gConn.conn = conn
816 } else {
817 log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name)
818 }
819}
820
821func (cn *beConnection) getConn() *grpc.ClientConn {
822 cn.lck.Lock()
823 defer cn.lck.Unlock()
824 if cn.gConn != nil {
825 return cn.gConn.conn
826 }
827 return nil
828}
829
830func (cn *beConnection) close() {
831 cn.lck.Lock()
832 defer cn.lck.Unlock()
833 log.Debugf("Closing connection %s", cn.name)
834 if cn.gConn != nil && cn.gConn.conn != nil {
835 if cn.gConn.conn.GetState() == connectivity.Ready {
836 cn.bknd.decConn() // Decrease the connection reference
837 }
838 if cn.gConn.cncl != nil {
839 cn.gConn.cncl() // Cancel the context first to force monitor functions to exit
840 } else {
841 log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
842 }
843 cn.gConn.conn.Close() // Close the connection
844 // Now replace the gConn object with a new one as this one just
845 // fades away as references to it are released after the close
846 // finishes in the background.
847 cn.gConn = &gConnection{conn:nil,cncl:nil,state:connectivity.TransientFailure}
848 } else {
849 log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name)
850 }
851
852}
853
854func (cn *beConnection) setState(st connectivity.State) {
855 cn.lck.Lock()
856 defer cn.lck.Unlock()
857 if cn.gConn != nil {
858 cn.gConn.state = st
859 } else {
860 log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name)
861 }
862}
863
864func (cn *beConnection) getState() (connectivity.State) {
865 cn.lck.Lock()
866 defer cn.lck.Unlock()
867 if cn.gConn != nil {
868 if cn.gConn.conn != nil {
869 return cn.gConn.conn.GetState()
870 } else {
871 log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name)
872 }
873 } else {
874 log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name)
875 }
876 // For lack of a better state to use. The logs will help determine what happened here.
877 return connectivity.TransientFailure
878}
879
880
881func (cn *beConnection) monitor(ctx context.Context) {
882 bp := cn.bknd
883 log.Debugf("Setting up monitoring for backend %s", bp.name)
884 go func(ctx context.Context) {
885 var delay time.Duration = 100 //ms
886 for {
887 //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, bp.name, cn.conn)
888 if cn.getState() == connectivity.Ready {
889 log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, bp.name)
890 cn.setState(connectivity.Ready)
891 bp.incConn()
892 if cn.getConn() != nil && cn.getConn().WaitForStateChange(ctx, connectivity.Ready) == false {
893 // The context was canceled. This is done by the close function
894 // so just exit the routine
895 log.Debugf("Contxt canceled for connection '%s' on backend '%s'",cn.name, bp.name)
896 return
897 }
898 if cs := cn.getConn(); cs != nil {
899 switch cs := cn.getState(); cs {
900 case connectivity.TransientFailure:
901 cn.setState(cs)
902 bp.decConn()
903 log.Infof("Transient failure for connection '%s' on backend '%s'",cn.name, bp.name)
904 delay = 100
905 case connectivity.Shutdown:
906 //The connection was closed. The assumption here is that the closer
907 // will manage the connection count and setting the conn to nil.
908 // Exit the routine
909 log.Infof("Shutdown for connection '%s' on backend '%s'",cn.name, bp.name)
910 return
911 case connectivity.Idle:
912 // This can only happen if the server sends a GoAway. This can
913 // only happen if the server has modified MaxConnectionIdle from
914 // its default of infinity. The only solution here is to close the
915 // connection and keepTrying()?
916 //TODO: Read the grpc source code to see if there's a different approach
917 log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'",cn.name, bp.name)
918 cn.close()
919 cn.connect()
920 return
921 }
922 } else { // A nil means something went horribly wrong, error and exit.
923 log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s",cn.name)
924 return
925 }
926 } else {
927 log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, bp.name)
928 ctxTm, cnclTm := context.WithTimeout(context.Background(), delay * time.Millisecond)
929 if delay < 30000 {
930 delay += delay
931 }
932 select {
933 case <-ctxTm.Done():
934 cnclTm() // Doubt this is required but it's harmless.
935 // Do nothing but let the loop continue
936 case <-ctx.Done():
937 // Context was closed, close and exit routine
938 //cn.close() NO! let the close be managed externally!
939 return
940 }
941 }
942 }
943 }(ctx)
944}
945
946// Set a callback for connection failure notification
947// This is currently not used.
948func (bp * backend) setConnFailCallback(cb func(string, *backend)bool) {
949 bp.connFailCallback = cb
950}
951