| /* |
| * Copyright 2018-present Open Networking Foundation |
| |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| // gRPC affinity router with active/active backends |
| |
| package afrouter |
| |
| // Backend manager handles redundant connections per backend |
| |
| import ( |
| "io" |
| "fmt" |
| "net" |
| "sync" |
| "time" |
| "sort" |
| "errors" |
| "strconv" |
| "strings" |
| "golang.org/x/net/context" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/metadata" |
| "google.golang.org/grpc/connectivity" |
| "github.com/opencord/voltha-go/common/log" |
| ) |
| |
| |
| |
| const ( |
| BE_ACTIVE_ACTIVE = 1 // Backend type active/active |
| BE_SERVER = 2 // Backend type single server |
| BE_SEQ_RR = 0 // Backend sequence round robin |
| AS_NONE = 0 // Association strategy: none |
| AS_SERIAL_NO = 1 // Association strategy: serial number |
| AL_NONE = 0 // Association location: none |
| AL_HEADER = 1 // Association location: header |
| AL_PROTOBUF = 2 // Association location: protobuf |
| ) |
| |
| |
| var beTypeNames = []string{"","active_active","server"} |
| var asTypeNames = []string{"","serial_number"} |
| var alTypeNames = []string{"","header","protobuf"} |
| |
| var bClusters map[string]*backendCluster = make(map[string]*backendCluster) |
| |
| type backendCluster struct { |
| name string |
| //backends map[string]*backend |
| backends []*backend |
| beRvMap map[*backend]int |
| serialNoSource chan uint64 |
| } |
| |
| type backend struct { |
| lck sync.Mutex |
| name string |
| beType int |
| activeAssoc assoc |
| connFailCallback func(string, *backend)bool |
| connections map[string]*beConnection |
| srtdConns []*beConnection |
| opnConns int |
| } |
| |
| type assoc struct { |
| strategy int |
| location int |
| field string // Used only if location is protobuf |
| key string |
| } |
| |
| type beConnection struct { |
| lck sync.Mutex |
| cncl context.CancelFunc |
| name string |
| addr string |
| port string |
| gConn *gConnection |
| bknd *backend |
| } |
| |
| // This structure should never be referred to |
| // by any routine outside of *beConnection |
| // routines. |
| type gConnection struct { |
| lck sync.Mutex |
| state connectivity.State |
| conn *grpc.ClientConn |
| cncl context.CancelFunc |
| } |
| |
| type beClStrm struct { |
| strm grpc.ClientStream |
| ctxt context.Context |
| cncl context.CancelFunc |
| ok2Close chan struct{} |
| c2sRtrn chan error |
| s2cRtrn error |
| } |
| |
| type beClStrms struct { |
| lck sync.Mutex |
| actvStrm *beClStrm |
| strms map[string]*beClStrm |
| srtdStrms []*beClStrm |
| } |
| |
| //***************************************************************// |
| //****************** BackendCluster Functions *******************// |
| //***************************************************************// |
| |
| //TODO: Move the backend type (active/active etc) to the cluster |
| // level. All backends should really be of the same type. |
| // Create a new backend cluster |
| func newBackendCluster(conf *BackendClusterConfig) (*backendCluster, error) { |
| var err error = nil |
| var rtrn_err bool = false |
| var be *backend |
| log.Debugf("Creating a backend cluster with %v", conf) |
| // Validate the configuration |
| if conf.Name == "" { |
| log.Error("A backend cluster must have a name") |
| rtrn_err = true |
| } |
| //bc := &backendCluster{name:conf.Name,backends:make(map[string]*backend)} |
| bc := &backendCluster{name:conf.Name, beRvMap:make(map[*backend]int)} |
| bClusters[bc.name] = bc |
| bc.startSerialNumberSource() // Serial numberere for active/active backends |
| idx := 0 |
| for _, bec := range conf.Backends { |
| if bec.Name == "" { |
| log.Errorf("A backend must have a name in cluster %s\n", conf.Name) |
| rtrn_err = true |
| } |
| if be,err = newBackend(&bec, conf.Name); err != nil { |
| log.Errorf("Error creating backend %s", bec.Name) |
| rtrn_err = true |
| } |
| bc.backends = append(bc.backends, be) |
| bc.beRvMap[bc.backends[idx]] = idx |
| idx++ |
| } |
| if rtrn_err { |
| return nil, errors.New("Error creating backend(s)") |
| } |
| return bc, nil |
| } |
| |
| func (bc * backendCluster) getBackend(name string) *backend { |
| for _,v := range bc.backends { |
| if v.name == name { |
| return v |
| } |
| } |
| return nil |
| } |
| |
| func (bc *backendCluster) startSerialNumberSource() { |
| bc.serialNoSource = make(chan uint64) |
| var counter uint64 = 0 |
| // This go routine only dies on exit, it is not a leak |
| go func() { |
| for { |
| bc.serialNoSource <- counter |
| counter++ |
| } |
| }() |
| } |
| |
| func (bc *backendCluster) nextBackend(be *backend, seq int) (*backend,error) { |
| switch seq { |
| case BE_SEQ_RR: // Round robin |
| in := be |
| // If no backend is found having a connection |
| // then return nil. |
| if be == nil { |
| log.Debug("Previous backend is nil") |
| be = bc.backends[0] |
| in = be |
| if be.opnConns != 0 { |
| return be,nil |
| } |
| } |
| for { |
| log.Debugf("Requesting a new backend starting from %s", be.name) |
| cur := bc.beRvMap[be] |
| cur++ |
| if cur >= len(bc.backends) { |
| cur = 0 |
| } |
| log.Debugf("Next backend is %d:%s", cur, bc.backends[cur].name) |
| if bc.backends[cur].opnConns > 0 { |
| return bc.backends[cur], nil |
| } |
| if bc.backends[cur] == in { |
| err := fmt.Errorf("No backend with open connections found") |
| log.Debug(err); |
| return nil,err |
| } |
| be = bc.backends[cur] |
| log.Debugf("Backend '%s' has no open connections, trying next", bc.backends[cur].name) |
| } |
| default: // Invalid, defalt to routnd robin |
| log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq) |
| return bc.nextBackend(be, BE_SEQ_RR) |
| } |
| } |
| |
| func (bec *backendCluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, mthdSlice []string, |
| mk string, mv string) error { |
| //func (bec *backendCluster) handler(nbR * nbRequest) error { |
| |
| // The final backend cluster needs to be determined here. With non-affinity routed backends it could |
| // just be determined here and for affinity routed backends the first message must be received |
| // before the backend is determined. In order to keep things simple, the same approach is taken for |
| // now. |
| |
| // Get the backend to use. |
| // Allocate the nbFrame here since it holds the "context" of this communication |
| nf := &nbFrame{router:r, mthdSlice:mthdSlice, serNo:bec.serialNoSource, metaKey:mk, metaVal:mv} |
| log.Debugf("Nb frame allocate with method %s", nf.mthdSlice[REQ_METHOD]) |
| |
| if be,err := bec.assignBackend(serverStream, nf); err != nil { |
| // At this point, no backend streams have been initiated |
| // so just return the error. |
| return err |
| } else { |
| log.Debugf("Backend '%s' selected", be.name) |
| // Allocate a sbFrame here because it might be needed for return value intercept |
| sf := &sbFrame{router:r, be:be, method:nf.mthdSlice[REQ_METHOD], metaKey:mk, metaVal:mv} |
| log.Debugf("Sb frame allocated with router %s",r.Name()) |
| return be.handler(srv, serverStream, nf, sf) |
| } |
| } |
| |
| func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f * nbFrame) (*beClStrms, error) { |
| |
| rtrn := &beClStrms{strms:make(map[string]*beClStrm),actvStrm:nil} |
| |
| log.Debugf("Opening southbound streams for method '%s'", f.mthdSlice[REQ_METHOD]) |
| // Get the metadata from the incoming message on the server |
| md, ok := metadata.FromIncomingContext(serverStream.Context()) |
| if !ok { |
| return nil, errors.New("Could not get a server stream metadata") |
| } |
| |
| // TODO: Need to check if this is an active/active backend cluster |
| // with a serial number in the header. |
| serialNo := <-f.serNo |
| log.Debugf("Serial number for transaction allocated: %d", serialNo) |
| // If even one stream can be created then proceed. If none can be |
| // created then report an error becase both the primary and redundant |
| // connections are non-existant. |
| var atLeastOne bool = false |
| var errStr strings.Builder |
| log.Debugf("There are %d connections to open", len(be.connections)) |
| for _,cn := range be.srtdConns { |
| // TODO: THIS IS A HACK to suspend redundancy for binding routers for all calls |
| // and its very specific to a use case. There should really be a per method |
| // mechanism to select non-redundant calls for all router types. This needs |
| // to be fixed ASAP. The overrides should be used for this, the implementation |
| // is simple, and it can be done here. |
| if atLeastOne == true && f.metaKey != NoMeta { |
| // Don't open any more southbound streams |
| log.Debugf("Not opening any more SB streams, metaKey = %s", f.metaKey) |
| rtrn.strms[cn.name] = nil |
| continue |
| } |
| // Copy in the metadata |
| if cn.getState() == connectivity.Ready && cn.getConn() != nil { |
| log.Debugf("Opening southbound stream for connection '%s'", cn.name) |
| // Create an outgoing context that includes the incoming metadata |
| // and that will cancel if the server's context is canceled |
| clientCtx, clientCancel := context.WithCancel(serverStream.Context()) |
| clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy()) |
| //TODO: Same check here, only add the serial number if necessary |
| clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number", |
| strconv.FormatUint(serialNo,10)) |
| // Create the client stream |
| if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, |
| cn.getConn(), f.mthdSlice[REQ_ALL]); err !=nil { |
| log.Debugf("Failed to create a client stream '%s', %v",cn.name,err) |
| fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err) |
| rtrn.strms[cn.name] = nil |
| } else { |
| rtrn.strms[cn.name] = &beClStrm{strm:clientStream, ctxt:clientCtx, |
| cncl:clientCancel, s2cRtrn:nil, |
| ok2Close:make(chan struct{}), |
| c2sRtrn:make(chan error, 1)} |
| atLeastOne = true |
| } |
| } else if cn.getConn() == nil { |
| err := errors.New(fmt.Sprintf("Connection '%s' is closed", cn.name)) |
| fmt.Fprint(&errStr, err.Error()) |
| log.Debug(err) |
| } else { |
| err := errors.New(fmt.Sprintf("Connection '%s' isn't ready", cn.name)) |
| fmt.Fprint(&errStr, err.Error()) |
| log.Debug(err) |
| } |
| } |
| if atLeastOne == true { |
| rtrn.sortStreams() |
| return rtrn,nil |
| } |
| fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ",be.name) |
| log.Error(errStr.String()) |
| return nil, errors.New(errStr.String()) |
| } |
| |
| func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf * nbFrame, sf * sbFrame) error { |
| |
| // Set up and launch each individual southbound stream |
| var beStrms *beClStrms |
| var rtrn error = nil |
| var s2cOk bool = false |
| var c2sOk bool = false |
| |
| beStrms, err := be.openSouthboundStreams(srv,serverStream,nf) |
| if err != nil { |
| log.Errorf("openStreams failed: %v",err) |
| return err |
| } |
| // If we get here, there has to be AT LEAST ONE open stream |
| |
| // *Do not explicitly close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate. |
| // Channels do not have to be closed, it is just a control flow mechanism, see |
| // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ |
| |
| log.Debug("Starting server to client forwarding") |
| s2cErrChan := beStrms.forwardServerToClient(serverStream, nf) |
| |
| log.Debug("Starting client to server forwarding") |
| c2sErrChan := beStrms.forwardClientToServer(serverStream, sf) |
| |
| // We don't know which side is going to stop sending first, so we need a select between the two. |
| for i := 0; i < 2; i++ { |
| select { |
| case s2cErr := <-s2cErrChan: |
| s2cOk = true |
| log.Debug("Processing s2cErr") |
| if s2cErr == io.EOF { |
| log.Debug("s2cErr reporting EOF") |
| // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./ |
| // the clientStream>serverStream may continue sending though. |
| defer beStrms.closeSend() |
| if c2sOk == true { |
| return rtrn |
| } |
| } else { |
| log.Debugf("s2cErr reporting %v",s2cErr) |
| // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need |
| // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and |
| // exit with an error to the stack |
| beStrms.clientCancel() |
| return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) |
| } |
| case c2sErr := <-c2sErrChan: |
| c2sOk = true |
| log.Debug("Processing c2sErr") |
| // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two |
| // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers |
| // will be nil. |
| serverStream.SetTrailer(beStrms.trailer()) |
| // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error. |
| // NOTE!!! with redundant backends, it's likely that one of the backends |
| // returns a response before all the data has been sent southbound and |
| // the southbound streams are closed. Should this happen one of the |
| // backends may not get the request. |
| if c2sErr != io.EOF { |
| rtrn = c2sErr |
| } |
| log.Debug("c2sErr reporting EOF") |
| if s2cOk == true { |
| return rtrn |
| } |
| } |
| } |
| return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") |
| } |
| |
| func (strms *beClStrms) clientCancel() { |
| for _,strm := range strms.strms { |
| if strm != nil { |
| strm.cncl() |
| } |
| } |
| } |
| |
| func (strms *beClStrms) closeSend() { |
| for _,strm := range strms.strms { |
| if strm != nil { |
| <-strm.ok2Close |
| log.Debug("Closing southbound stream") |
| strm.strm.CloseSend() |
| } |
| } |
| } |
| |
| func (strms *beClStrms) trailer() metadata.MD { |
| return strms.actvStrm.strm.Trailer() |
| } |
| |
| func (bec *backendCluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) { |
| // Receive the first message from the server. This calls the assigned codec in which |
| // Unmarshal gets executed. That will use the assigned router to select a backend |
| // and add it to the frame |
| if err := src.RecvMsg(f); err != nil { |
| return nil, err |
| } |
| // Check that the backend was routable and actually has connections open. |
| // If it doesn't then return a nil backend to indicate this |
| if f.be == nil { |
| err := fmt.Errorf("Unable to route method '%s'", f.mthdSlice[REQ_METHOD]) |
| log.Error(err) |
| return nil, err |
| } else if f.be.opnConns == 0 { |
| err := fmt.Errorf("No open connections on backend '%s'", f.be.name) |
| log.Error(err) |
| return f.be, err |
| } |
| return f.be, nil |
| } |
| |
| func (strms * beClStrms) getActive() *beClStrm { |
| strms.lck.Lock() |
| defer strms.lck.Unlock() |
| return strms.actvStrm |
| } |
| |
| func (strms *beClStrms) setThenGetActive(strm *beClStrm) (*beClStrm) { |
| strms.lck.Lock() |
| defer strms.lck.Unlock() |
| if strms.actvStrm == nil { |
| strms.actvStrm = strm |
| } |
| return strms.actvStrm |
| } |
| |
| func (src *beClStrms) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error { |
| fc2s := func(srcS *beClStrm) { |
| for i := 0; ; i++ { |
| if err := srcS.strm.RecvMsg(f); err != nil { |
| if src.setThenGetActive(srcS) == srcS { |
| srcS.c2sRtrn <- err // this can be io.EOF which is the success case |
| } else { |
| srcS.c2sRtrn <- nil // Inactive responder |
| } |
| close(srcS.ok2Close) |
| break |
| } |
| if src.setThenGetActive(srcS) != srcS { |
| srcS.c2sRtrn <- nil |
| continue |
| } |
| if i == 0 { |
| // This is a bit of a hack, but client to server headers are only readable after first client msg is |
| // received but must be written to server stream before the first msg is flushed. |
| // This is the only place to do it nicely. |
| md, err := srcS.strm.Header() |
| if err != nil { |
| srcS.c2sRtrn <- err |
| break |
| } |
| // Update the metadata for the response. |
| if f.metaKey != NoMeta { |
| if f.metaVal == "" { |
| // We could also alsways just do this |
| md.Set(f.metaKey, f.be.name) |
| } else { |
| md.Set(f.metaKey, f.metaVal) |
| } |
| } |
| if err := dst.SendHeader(md); err != nil { |
| srcS.c2sRtrn <- err |
| break |
| } |
| } |
| log.Debugf("Northbound frame %v", f.payload) |
| if err := dst.SendMsg(f); err != nil { |
| srcS.c2sRtrn <- err |
| break |
| } |
| } |
| } |
| |
| // There should be AT LEAST one open stream at this point |
| // if there isn't its a grave error in the code and it will |
| // cause this thread to block here so check for it and |
| // don't let the lock up happen but report the error |
| ret := make(chan error, 1) |
| agg := make(chan *beClStrm) |
| atLeastOne := false |
| for _,strm := range src.strms { |
| if strm != nil { |
| go fc2s(strm) |
| go func(s *beClStrm) { // Wait on result and aggregate |
| r := <-s.c2sRtrn // got the return code |
| if r == nil { |
| return // We're the redundat stream, just die |
| } |
| s.c2sRtrn <- r // put it back to pass it along |
| agg <- s // send the stream to the aggregator |
| } (strm) |
| atLeastOne = true |
| } |
| } |
| if atLeastOne == true { |
| go func() { // Wait on aggregated result |
| s := <-agg |
| ret <- <-s.c2sRtrn |
| }() |
| } else { |
| err := errors.New("There are no open streams. Unable to forward message.") |
| log.Error(err) |
| ret <- err |
| } |
| return ret |
| } |
| |
| func (strms *beClStrms) sendAll(f *nbFrame) error { |
| var rtrn error |
| |
| atLeastOne := false |
| for _,strm := range strms.srtdStrms { |
| if strm != nil { |
| if err := strm.strm.SendMsg(f); err != nil { |
| log.Debugf("Error on SendMsg: %s", err.Error()) |
| strm.s2cRtrn = err |
| } |
| atLeastOne = true |
| } else { |
| log.Debugf("Nil stream") |
| } |
| } |
| // If one of the streams succeeded, declare success |
| // if none did pick an error and return it. |
| if atLeastOne == true { |
| for _,strm := range strms.srtdStrms { |
| if strm != nil { |
| rtrn = strm.s2cRtrn |
| if rtrn == nil { |
| return rtrn |
| } |
| } |
| } |
| return rtrn |
| } else { |
| rtrn = errors.New("There are no open streams, this should never happen") |
| log.Error(rtrn) |
| } |
| return rtrn; |
| } |
| |
| func (dst *beClStrms) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error { |
| ret := make(chan error, 1) |
| go func() { |
| // The frame buffer already has the results of a first |
| // RecvMsg in it so the first thing to do is to |
| // send it to the list of client streams and only |
| // then read some more. |
| for i := 0; ; i++ { |
| // Send the message to each of the backend streams |
| if err := dst.sendAll(f); err != nil { |
| ret <- err |
| log.Debugf("SendAll failed %s", err.Error()) |
| break |
| } |
| log.Debugf("Southbound frame %v", f.payload) |
| if err := src.RecvMsg(f); err != nil { |
| ret <- err // this can be io.EOF which is happy case |
| break |
| } |
| } |
| }() |
| return ret |
| } |
| |
| func (st * beClStrms) sortStreams() { |
| var tmpKeys []string |
| for k,_ := range st.strms { |
| tmpKeys = append(tmpKeys, k) |
| } |
| sort.Strings(tmpKeys) |
| for _,v := range tmpKeys { |
| st.srtdStrms = append(st.srtdStrms, st.strms[v]) |
| } |
| } |
| |
| func (be * backend) sortConns() { |
| var tmpKeys []string |
| for k,_ := range be.connections { |
| tmpKeys = append(tmpKeys, k) |
| } |
| sort.Strings(tmpKeys) |
| for _,v := range tmpKeys { |
| be.srtdConns = append(be.srtdConns, be.connections[v]) |
| } |
| } |
| |
| func newBackend(conf *BackendConfig, clusterName string) (*backend, error) { |
| var rtrn_err bool = false |
| |
| log.Debugf("Configuring the backend with %v", *conf) |
| // Validate the conifg and configure the backend |
| be:=&backend{name:conf.Name,connections:make(map[string]*beConnection),opnConns:0} |
| idx := strIndex([]string(beTypeNames),conf.Type) |
| if idx == 0 { |
| log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName) |
| rtrn_err = true |
| } |
| be.beType = idx |
| |
| idx = strIndex(asTypeNames, conf.Association.Strategy) |
| if idx == 0 && be.beType == BE_ACTIVE_ACTIVE { |
| log.Errorf("An association strategy must be provided if the backend "+ |
| "type is active/active for backend %s in cluster %s", conf.Name, clusterName) |
| rtrn_err = true |
| } |
| be.activeAssoc.strategy = idx |
| |
| idx = strIndex(alTypeNames, conf.Association.Location) |
| if idx == 0 && be.beType == BE_ACTIVE_ACTIVE { |
| log.Errorf("An association location must be provided if the backend "+ |
| "type is active/active for backend %s in cluster %s", conf.Name, clusterName) |
| rtrn_err = true |
| } |
| be.activeAssoc.location = idx |
| |
| if conf.Association.Field == "" && be.activeAssoc.location == AL_PROTOBUF { |
| log.Errorf("An association field must be provided if the backend "+ |
| "type is active/active and the location is set to protobuf "+ |
| "for backend %s in cluster %s", conf.Name, clusterName) |
| rtrn_err = true |
| } |
| be.activeAssoc.field = conf.Association.Field |
| |
| if conf.Association.Key == "" && be.activeAssoc.location == AL_HEADER { |
| log.Errorf("An association key must be provided if the backend "+ |
| "type is active/active and the location is set to header "+ |
| "for backend %s in cluster %s", conf.Name, clusterName) |
| rtrn_err = true |
| } |
| be.activeAssoc.key = conf.Association.Key |
| if rtrn_err { |
| return nil, errors.New("Backend configuration failed") |
| } |
| // Configure the connections |
| // Connections can consist of just a name. This allows for dynamic configuration |
| // at a later time. |
| // TODO: validate that there is one connection for all but active/active backends |
| if len(conf.Connections) > 1 && be.activeAssoc.strategy != BE_ACTIVE_ACTIVE { |
| log.Errorf("Only one connection must be specified if the association "+ |
| "strategy is not set to 'active_active'") |
| rtrn_err = true |
| } |
| if len(conf.Connections) == 0 { |
| log.Errorf("At least one connection must be specified") |
| rtrn_err = true |
| } |
| for _,cnConf := range conf.Connections { |
| if cnConf.Name == "" { |
| log.Errorf("A connection must have a name for backend %s in cluster %s", |
| conf.Name, clusterName) |
| } else { |
| gc:=&gConnection{conn:nil,cncl:nil,state:connectivity.Idle} |
| be.connections[cnConf.Name] = &beConnection{name:cnConf.Name,addr:cnConf.Addr,port:cnConf.Port,bknd:be,gConn:gc} |
| if cnConf.Addr != "" { // This connection will be specified later. |
| if ip := net.ParseIP(cnConf.Addr); ip == nil { |
| log.Errorf("The IP address for connection %s in backend %s in cluster %s is invalid", |
| cnConf.Name, conf.Name, clusterName) |
| rtrn_err = true |
| } |
| // Validate the port number. This just validtes that it's a non 0 integer |
| if n,err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 { |
| log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid", |
| cnConf.Port, cnConf.Name, conf.Name, clusterName) |
| rtrn_err = true |
| } else { |
| if n <=0 && n > 65535 { |
| log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid", |
| cnConf.Port, cnConf.Name, conf.Name, clusterName) |
| rtrn_err = true |
| } |
| } |
| } |
| } |
| } |
| |
| if rtrn_err { |
| return nil, errors.New("Connection configuration failed") |
| } |
| // Create the sorted connection list for deterministic |
| // active-active call orders. |
| be.sortConns() |
| // All is well start the backend cluster connections |
| be.connectAll() |
| |
| return be, nil |
| } |
| |
| //***************************************************************// |
| //********************* Backend Functions ***********************// |
| //***************************************************************// |
| |
| func (be *backend) incConn() { |
| be.lck.Lock() |
| defer be.lck.Unlock() |
| be.opnConns++ |
| } |
| |
| func (be *backend) decConn() { |
| be.lck.Lock() |
| defer be.lck.Unlock() |
| be.opnConns-- |
| if be.opnConns < 0 { |
| log.Error("Internal error, number of open connections less than 0") |
| be.opnConns = 0 |
| } |
| } |
| |
| // Attempts to establish all the connections for a backend |
| // any failures result in an abort. This should only be called |
| // on a first attempt to connect. Individual connections should be |
| // handled after that. |
| func (be *backend) connectAll() { |
| for _,cn := range be.connections { |
| cn.connect() |
| } |
| } |
| |
| func (cn *beConnection) connect() { |
| if cn.addr != "" && cn.getConn() == nil { |
| log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name,cn.addr,cn.port) |
| // Dial doesn't block, it just returns and continues connecting in the background. |
| // Check back later to confirm and increase the connection count. |
| ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection |
| cn.setCncl(cnclFnc) |
| if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil { |
| log.Errorf("Dialng connection %v:%v",cn,err) |
| cn.waitAndTryAgain(ctx) |
| } else { |
| cn.setConn(conn) |
| log.Debugf("Starting the connection monitor for '%s'", cn.name) |
| cn.monitor(ctx) |
| } |
| } else if cn.addr == "" { |
| log.Infof("No address supplied for connection '%s', not connecting for now", cn.name) |
| } else { |
| log.Debugf("Connection '%s' is already connected, ignoring", cn.name) |
| } |
| } |
| |
| func (cn *beConnection) waitAndTryAgain(ctx context.Context) { |
| go func(ctx context.Context) { |
| ctxTm,cnclTm := context.WithTimeout(context.Background(), 10 * time.Second) |
| select { |
| case <-ctxTm.Done(): |
| cnclTm() |
| log.Debugf("Trying to connect '%s'",cn.name) |
| // Connect creates a new context so cancel this one. |
| cn.cancel() |
| cn.connect() |
| return |
| case <-ctx.Done(): |
| cnclTm() |
| return |
| } |
| }(ctx) |
| } |
| |
| func (cn *beConnection) cancel() { |
| cn.lck.Lock() |
| defer cn.lck.Unlock() |
| log.Debugf("Canceling connection %s", cn.name) |
| if cn.gConn != nil{ |
| if cn.gConn.cncl != nil { |
| cn.cncl() |
| } else { |
| log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name) |
| } |
| } else { |
| log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name) |
| } |
| } |
| |
| func (cn *beConnection) setCncl(cncl context.CancelFunc) { |
| cn.lck.Lock() |
| defer cn.lck.Unlock() |
| if cn.gConn != nil { |
| cn.gConn.cncl = cncl |
| } else { |
| log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name) |
| } |
| } |
| |
| func (cn *beConnection) setConn(conn *grpc.ClientConn) { |
| cn.lck.Lock() |
| defer cn.lck.Unlock() |
| if cn.gConn != nil { |
| cn.gConn.conn = conn |
| } else { |
| log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name) |
| } |
| } |
| |
| func (cn *beConnection) getConn() *grpc.ClientConn { |
| cn.lck.Lock() |
| defer cn.lck.Unlock() |
| if cn.gConn != nil { |
| return cn.gConn.conn |
| } |
| return nil |
| } |
| |
| func (cn *beConnection) close() { |
| cn.lck.Lock() |
| defer cn.lck.Unlock() |
| log.Debugf("Closing connection %s", cn.name) |
| if cn.gConn != nil && cn.gConn.conn != nil { |
| if cn.gConn.conn.GetState() == connectivity.Ready { |
| cn.bknd.decConn() // Decrease the connection reference |
| } |
| if cn.gConn.cncl != nil { |
| cn.gConn.cncl() // Cancel the context first to force monitor functions to exit |
| } else { |
| log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name) |
| } |
| cn.gConn.conn.Close() // Close the connection |
| // Now replace the gConn object with a new one as this one just |
| // fades away as references to it are released after the close |
| // finishes in the background. |
| cn.gConn = &gConnection{conn:nil,cncl:nil,state:connectivity.TransientFailure} |
| } else { |
| log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name) |
| } |
| |
| } |
| |
| func (cn *beConnection) setState(st connectivity.State) { |
| cn.lck.Lock() |
| defer cn.lck.Unlock() |
| if cn.gConn != nil { |
| cn.gConn.state = st |
| } else { |
| log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name) |
| } |
| } |
| |
| func (cn *beConnection) getState() (connectivity.State) { |
| cn.lck.Lock() |
| defer cn.lck.Unlock() |
| if cn.gConn != nil { |
| if cn.gConn.conn != nil { |
| return cn.gConn.conn.GetState() |
| } else { |
| log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name) |
| } |
| } else { |
| log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name) |
| } |
| // For lack of a better state to use. The logs will help determine what happened here. |
| return connectivity.TransientFailure |
| } |
| |
| |
| func (cn *beConnection) monitor(ctx context.Context) { |
| bp := cn.bknd |
| log.Debugf("Setting up monitoring for backend %s", bp.name) |
| go func(ctx context.Context) { |
| var delay time.Duration = 100 //ms |
| for { |
| //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, bp.name, cn.conn) |
| if cn.getState() == connectivity.Ready { |
| log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, bp.name) |
| cn.setState(connectivity.Ready) |
| bp.incConn() |
| if cn.getConn() != nil && cn.getConn().WaitForStateChange(ctx, connectivity.Ready) == false { |
| // The context was canceled. This is done by the close function |
| // so just exit the routine |
| log.Debugf("Contxt canceled for connection '%s' on backend '%s'",cn.name, bp.name) |
| return |
| } |
| if cs := cn.getConn(); cs != nil { |
| switch cs := cn.getState(); cs { |
| case connectivity.TransientFailure: |
| cn.setState(cs) |
| bp.decConn() |
| log.Infof("Transient failure for connection '%s' on backend '%s'",cn.name, bp.name) |
| delay = 100 |
| case connectivity.Shutdown: |
| //The connection was closed. The assumption here is that the closer |
| // will manage the connection count and setting the conn to nil. |
| // Exit the routine |
| log.Infof("Shutdown for connection '%s' on backend '%s'",cn.name, bp.name) |
| return |
| case connectivity.Idle: |
| // This can only happen if the server sends a GoAway. This can |
| // only happen if the server has modified MaxConnectionIdle from |
| // its default of infinity. The only solution here is to close the |
| // connection and keepTrying()? |
| //TODO: Read the grpc source code to see if there's a different approach |
| log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'",cn.name, bp.name) |
| cn.close() |
| cn.connect() |
| return |
| } |
| } else { // A nil means something went horribly wrong, error and exit. |
| log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s",cn.name) |
| return |
| } |
| } else { |
| log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, bp.name) |
| ctxTm, cnclTm := context.WithTimeout(context.Background(), delay * time.Millisecond) |
| if delay < 30000 { |
| delay += delay |
| } |
| select { |
| case <-ctxTm.Done(): |
| cnclTm() // Doubt this is required but it's harmless. |
| // Do nothing but let the loop continue |
| case <-ctx.Done(): |
| // Context was closed, close and exit routine |
| //cn.close() NO! let the close be managed externally! |
| return |
| } |
| } |
| } |
| }(ctx) |
| } |
| |
| // Set a callback for connection failure notification |
| // This is currently not used. |
| func (bp * backend) setConnFailCallback(cb func(string, *backend)bool) { |
| bp.connFailCallback = cb |
| } |
| |