sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2018-present Open Networking Foundation |
| 3 | |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | // gRPC affinity router with active/active backends |
| 17 | |
| 18 | package afrouter |
| 19 | |
| 20 | // Backend manager handles redundant connections per backend |
| 21 | |
| 22 | import ( |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 23 | "errors" |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 24 | "fmt" |
| 25 | "github.com/opencord/voltha-go/common/log" |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 26 | "golang.org/x/net/context" |
| 27 | "google.golang.org/grpc" |
| 28 | "google.golang.org/grpc/codes" |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 29 | "google.golang.org/grpc/connectivity" |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 30 | "google.golang.org/grpc/metadata" |
| 31 | "io" |
| 32 | "net" |
| 33 | "sort" |
| 34 | "strconv" |
| 35 | "strings" |
| 36 | "sync" |
| 37 | "time" |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 38 | ) |
| 39 | |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 40 | const ( |
| 41 | BE_ACTIVE_ACTIVE = 1 // Backend type active/active |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 42 | BE_SERVER = 2 // Backend type single server |
| 43 | BE_SEQ_RR = 0 // Backend sequence round robin |
| 44 | AS_NONE = 0 // Association strategy: none |
| 45 | AS_SERIAL_NO = 1 // Association strategy: serial number |
| 46 | AL_NONE = 0 // Association location: none |
| 47 | AL_HEADER = 1 // Association location: header |
| 48 | AL_PROTOBUF = 2 // Association location: protobuf |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 49 | ) |
| 50 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 51 | var beTypeNames = []string{"", "active_active", "server"} |
| 52 | var asTypeNames = []string{"", "serial_number"} |
| 53 | var alTypeNames = []string{"", "header", "protobuf"} |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 54 | |
| 55 | var bClusters map[string]*backendCluster = make(map[string]*backendCluster) |
| 56 | |
| 57 | type backendCluster struct { |
| 58 | name string |
| 59 | //backends map[string]*backend |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 60 | backends []*backend |
| 61 | beRvMap map[*backend]int |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 62 | serialNoSource chan uint64 |
| 63 | } |
| 64 | |
| 65 | type backend struct { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 66 | lck sync.Mutex |
| 67 | name string |
| 68 | beType int |
| 69 | activeAssoc assoc |
| 70 | connFailCallback func(string, *backend) bool |
| 71 | connections map[string]*beConnection |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 72 | opnConns int |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 73 | } |
| 74 | |
| 75 | type assoc struct { |
| 76 | strategy int |
| 77 | location int |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 78 | field string // Used only if location is protobuf |
| 79 | key string |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 80 | } |
| 81 | |
| 82 | type beConnection struct { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 83 | lck sync.Mutex |
| 84 | cncl context.CancelFunc |
| 85 | name string |
| 86 | addr string |
| 87 | port string |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 88 | gConn *gConnection |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 89 | bknd *backend |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 90 | } |
| 91 | |
| 92 | // This structure should never be referred to |
| 93 | // by any routine outside of *beConnection |
| 94 | // routines. |
| 95 | type gConnection struct { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 96 | lck sync.Mutex |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 97 | state connectivity.State |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 98 | conn *grpc.ClientConn |
| 99 | cncl context.CancelFunc |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 100 | } |
| 101 | |
| 102 | type beClStrm struct { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 103 | strm grpc.ClientStream |
| 104 | ctxt context.Context |
| 105 | cncl context.CancelFunc |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 106 | ok2Close chan struct{} |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 107 | c2sRtrn chan error |
| 108 | s2cRtrn error |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 109 | } |
| 110 | |
| 111 | type beClStrms struct { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 112 | lck sync.Mutex |
| 113 | actvStrm *beClStrm |
| 114 | strms map[string]*beClStrm |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 115 | srtdStrms []*beClStrm |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 116 | } |
| 117 | |
| 118 | //***************************************************************// |
| 119 | //****************** BackendCluster Functions *******************// |
| 120 | //***************************************************************// |
| 121 | |
| 122 | //TODO: Move the backend type (active/active etc) to the cluster |
| 123 | // level. All backends should really be of the same type. |
| 124 | // Create a new backend cluster |
sslobodr | cd37bc5 | 2019-01-24 11:47:16 -0500 | [diff] [blame] | 125 | func newBackendCluster(conf *BackendClusterConfig) (*backendCluster, error) { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 126 | var err error = nil |
| 127 | var rtrn_err bool = false |
| 128 | var be *backend |
sslobodr | 8e2ccb5 | 2019-02-05 09:21:47 -0500 | [diff] [blame] | 129 | log.Debugf("Creating a backend cluster with %v", conf) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 130 | // Validate the configuration |
| 131 | if conf.Name == "" { |
| 132 | log.Error("A backend cluster must have a name") |
| 133 | rtrn_err = true |
| 134 | } |
| 135 | //bc := &backendCluster{name:conf.Name,backends:make(map[string]*backend)} |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 136 | bc := &backendCluster{name: conf.Name, beRvMap: make(map[*backend]int)} |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 137 | bClusters[bc.name] = bc |
| 138 | bc.startSerialNumberSource() // Serial numberere for active/active backends |
| 139 | idx := 0 |
sslobodr | 5f0b5a3 | 2019-01-24 07:45:19 -0500 | [diff] [blame] | 140 | for _, bec := range conf.Backends { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 141 | if bec.Name == "" { |
| 142 | log.Errorf("A backend must have a name in cluster %s\n", conf.Name) |
| 143 | rtrn_err = true |
| 144 | } |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 145 | if be, err = newBackend(&bec, conf.Name); err != nil { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 146 | log.Errorf("Error creating backend %s", bec.Name) |
| 147 | rtrn_err = true |
| 148 | } |
| 149 | bc.backends = append(bc.backends, be) |
| 150 | bc.beRvMap[bc.backends[idx]] = idx |
| 151 | idx++ |
| 152 | } |
| 153 | if rtrn_err { |
| 154 | return nil, errors.New("Error creating backend(s)") |
| 155 | } |
| 156 | return bc, nil |
| 157 | } |
| 158 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 159 | func (bc *backendCluster) getBackend(name string) *backend { |
| 160 | for _, v := range bc.backends { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 161 | if v.name == name { |
| 162 | return v |
| 163 | } |
| 164 | } |
| 165 | return nil |
| 166 | } |
| 167 | |
| 168 | func (bc *backendCluster) startSerialNumberSource() { |
| 169 | bc.serialNoSource = make(chan uint64) |
| 170 | var counter uint64 = 0 |
| 171 | // This go routine only dies on exit, it is not a leak |
| 172 | go func() { |
| 173 | for { |
| 174 | bc.serialNoSource <- counter |
| 175 | counter++ |
| 176 | } |
| 177 | }() |
| 178 | } |
| 179 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 180 | func (bc *backendCluster) nextBackend(be *backend, seq int) (*backend, error) { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 181 | switch seq { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 182 | case BE_SEQ_RR: // Round robin |
| 183 | in := be |
| 184 | // If no backend is found having a connection |
| 185 | // then return nil. |
| 186 | if be == nil { |
| 187 | log.Debug("Previous backend is nil") |
| 188 | be = bc.backends[0] |
| 189 | in = be |
| 190 | if be.opnConns != 0 { |
| 191 | return be, nil |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 192 | } |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 193 | } |
| 194 | for { |
| 195 | log.Debugf("Requesting a new backend starting from %s", be.name) |
| 196 | cur := bc.beRvMap[be] |
| 197 | cur++ |
| 198 | if cur >= len(bc.backends) { |
| 199 | cur = 0 |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 200 | } |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 201 | log.Debugf("Next backend is %d:%s", cur, bc.backends[cur].name) |
| 202 | if bc.backends[cur].opnConns > 0 { |
| 203 | return bc.backends[cur], nil |
| 204 | } |
| 205 | if bc.backends[cur] == in { |
| 206 | err := fmt.Errorf("No backend with open connections found") |
| 207 | log.Debug(err) |
| 208 | return nil, err |
| 209 | } |
| 210 | be = bc.backends[cur] |
| 211 | log.Debugf("Backend '%s' has no open connections, trying next", bc.backends[cur].name) |
| 212 | } |
| 213 | default: // Invalid, defalt to routnd robin |
| 214 | log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq) |
| 215 | return bc.nextBackend(be, BE_SEQ_RR) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 216 | } |
| 217 | } |
| 218 | |
| 219 | func (bec *backendCluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, mthdSlice []string, |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 220 | mk string, mv string) error { |
| 221 | //func (bec *backendCluster) handler(nbR * nbRequest) error { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 222 | |
| 223 | // The final backend cluster needs to be determined here. With non-affinity routed backends it could |
| 224 | // just be determined here and for affinity routed backends the first message must be received |
| 225 | // before the backend is determined. In order to keep things simple, the same approach is taken for |
| 226 | // now. |
| 227 | |
| 228 | // Get the backend to use. |
| 229 | // Allocate the nbFrame here since it holds the "context" of this communication |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 230 | nf := &nbFrame{router: r, mthdSlice: mthdSlice, serNo: bec.serialNoSource, metaKey: mk, metaVal: mv} |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 231 | log.Debugf("Nb frame allocate with method %s", nf.mthdSlice[REQ_METHOD]) |
| 232 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 233 | if be, err := bec.assignBackend(serverStream, nf); err != nil { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 234 | // At this point, no backend streams have been initiated |
| 235 | // so just return the error. |
| 236 | return err |
| 237 | } else { |
| 238 | log.Debugf("Backend '%s' selected", be.name) |
| 239 | // Allocate a sbFrame here because it might be needed for return value intercept |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 240 | sf := &sbFrame{router: r, be: be, method: nf.mthdSlice[REQ_METHOD], metaKey: mk, metaVal: mv} |
| 241 | log.Debugf("Sb frame allocated with router %s", r.Name()) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 242 | return be.handler(srv, serverStream, nf, sf) |
| 243 | } |
| 244 | } |
| 245 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 246 | func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f *nbFrame) (*beClStrms, error) { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 247 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 248 | rtrn := &beClStrms{strms: make(map[string]*beClStrm), actvStrm: nil} |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 249 | |
| 250 | log.Debugf("Opening southbound streams for method '%s'", f.mthdSlice[REQ_METHOD]) |
| 251 | // Get the metadata from the incoming message on the server |
| 252 | md, ok := metadata.FromIncomingContext(serverStream.Context()) |
| 253 | if !ok { |
| 254 | return nil, errors.New("Could not get a server stream metadata") |
| 255 | } |
| 256 | |
| 257 | // TODO: Need to check if this is an active/active backend cluster |
| 258 | // with a serial number in the header. |
| 259 | serialNo := <-f.serNo |
| 260 | log.Debugf("Serial number for transaction allocated: %d", serialNo) |
| 261 | // If even one stream can be created then proceed. If none can be |
| 262 | // created then report an error becase both the primary and redundant |
| 263 | // connections are non-existant. |
| 264 | var atLeastOne bool = false |
| 265 | var errStr strings.Builder |
| 266 | log.Debugf("There are %d connections to open", len(be.connections)) |
Kent Hagerman | 1b9c706 | 2019-05-07 16:46:01 -0400 | [diff] [blame] | 267 | for _, cn := range be.connections { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 268 | // Copy in the metadata |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 269 | if cn.getState() == connectivity.Ready && cn.getConn() != nil { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 270 | log.Debugf("Opening southbound stream for connection '%s'", cn.name) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 271 | // Create an outgoing context that includes the incoming metadata |
| 272 | // and that will cancel if the server's context is canceled |
| 273 | clientCtx, clientCancel := context.WithCancel(serverStream.Context()) |
| 274 | clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy()) |
| 275 | //TODO: Same check here, only add the serial number if necessary |
| 276 | clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number", |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 277 | strconv.FormatUint(serialNo, 10)) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 278 | // Create the client stream |
| 279 | if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 280 | cn.getConn(), f.mthdSlice[REQ_ALL]); err != nil { |
| 281 | log.Debugf("Failed to create a client stream '%s', %v", cn.name, err) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 282 | fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err) |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 283 | rtrn.strms[cn.name] = nil |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 284 | } else { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 285 | rtrn.strms[cn.name] = &beClStrm{strm: clientStream, ctxt: clientCtx, |
| 286 | cncl: clientCancel, s2cRtrn: nil, |
| 287 | ok2Close: make(chan struct{}), |
| 288 | c2sRtrn: make(chan error, 1)} |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 289 | atLeastOne = true |
| 290 | } |
| 291 | } else if cn.getConn() == nil { |
| 292 | err := errors.New(fmt.Sprintf("Connection '%s' is closed", cn.name)) |
| 293 | fmt.Fprint(&errStr, err.Error()) |
| 294 | log.Debug(err) |
| 295 | } else { |
| 296 | err := errors.New(fmt.Sprintf("Connection '%s' isn't ready", cn.name)) |
| 297 | fmt.Fprint(&errStr, err.Error()) |
| 298 | log.Debug(err) |
| 299 | } |
| 300 | } |
| 301 | if atLeastOne == true { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 302 | rtrn.sortStreams() |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 303 | return rtrn, nil |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 304 | } |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 305 | fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ", be.name) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 306 | log.Error(errStr.String()) |
| 307 | return nil, errors.New(errStr.String()) |
| 308 | } |
| 309 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 310 | func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *nbFrame, sf *sbFrame) error { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 311 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 312 | // Set up and launch each individual southbound stream |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 313 | var beStrms *beClStrms |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 314 | var rtrn error = nil |
| 315 | var s2cOk bool = false |
| 316 | var c2sOk bool = false |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 317 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 318 | beStrms, err := be.openSouthboundStreams(srv, serverStream, nf) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 319 | if err != nil { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 320 | log.Errorf("openStreams failed: %v", err) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 321 | return err |
| 322 | } |
| 323 | // If we get here, there has to be AT LEAST ONE open stream |
| 324 | |
| 325 | // *Do not explicitly close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate. |
| 326 | // Channels do not have to be closed, it is just a control flow mechanism, see |
| 327 | // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ |
| 328 | |
| 329 | log.Debug("Starting server to client forwarding") |
| 330 | s2cErrChan := beStrms.forwardServerToClient(serverStream, nf) |
| 331 | |
| 332 | log.Debug("Starting client to server forwarding") |
| 333 | c2sErrChan := beStrms.forwardClientToServer(serverStream, sf) |
| 334 | |
| 335 | // We don't know which side is going to stop sending first, so we need a select between the two. |
| 336 | for i := 0; i < 2; i++ { |
| 337 | select { |
| 338 | case s2cErr := <-s2cErrChan: |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 339 | s2cOk = true |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 340 | log.Debug("Processing s2cErr") |
| 341 | if s2cErr == io.EOF { |
| 342 | log.Debug("s2cErr reporting EOF") |
| 343 | // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./ |
| 344 | // the clientStream>serverStream may continue sending though. |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 345 | defer beStrms.closeSend() |
| 346 | if c2sOk == true { |
| 347 | return rtrn |
| 348 | } |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 349 | } else { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 350 | log.Debugf("s2cErr reporting %v", s2cErr) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 351 | // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need |
| 352 | // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and |
| 353 | // exit with an error to the stack |
| 354 | beStrms.clientCancel() |
| 355 | return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) |
| 356 | } |
| 357 | case c2sErr := <-c2sErrChan: |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 358 | c2sOk = true |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 359 | log.Debug("Processing c2sErr") |
| 360 | // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two |
| 361 | // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers |
| 362 | // will be nil. |
| 363 | serverStream.SetTrailer(beStrms.trailer()) |
| 364 | // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error. |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 365 | // NOTE!!! with redundant backends, it's likely that one of the backends |
| 366 | // returns a response before all the data has been sent southbound and |
| 367 | // the southbound streams are closed. Should this happen one of the |
| 368 | // backends may not get the request. |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 369 | if c2sErr != io.EOF { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 370 | rtrn = c2sErr |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 371 | } |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 372 | log.Debug("c2sErr reporting EOF") |
| 373 | if s2cOk == true { |
| 374 | return rtrn |
| 375 | } |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 376 | } |
| 377 | } |
| 378 | return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") |
| 379 | } |
| 380 | |
| 381 | func (strms *beClStrms) clientCancel() { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 382 | for _, strm := range strms.strms { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 383 | if strm != nil { |
| 384 | strm.cncl() |
| 385 | } |
| 386 | } |
| 387 | } |
| 388 | |
| 389 | func (strms *beClStrms) closeSend() { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 390 | for _, strm := range strms.strms { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 391 | if strm != nil { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 392 | <-strm.ok2Close |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 393 | log.Debug("Closing southbound stream") |
| 394 | strm.strm.CloseSend() |
| 395 | } |
| 396 | } |
| 397 | } |
| 398 | |
| 399 | func (strms *beClStrms) trailer() metadata.MD { |
| 400 | return strms.actvStrm.strm.Trailer() |
| 401 | } |
| 402 | |
| 403 | func (bec *backendCluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) { |
| 404 | // Receive the first message from the server. This calls the assigned codec in which |
| 405 | // Unmarshal gets executed. That will use the assigned router to select a backend |
| 406 | // and add it to the frame |
| 407 | if err := src.RecvMsg(f); err != nil { |
| 408 | return nil, err |
| 409 | } |
| 410 | // Check that the backend was routable and actually has connections open. |
| 411 | // If it doesn't then return a nil backend to indicate this |
| 412 | if f.be == nil { |
| 413 | err := fmt.Errorf("Unable to route method '%s'", f.mthdSlice[REQ_METHOD]) |
| 414 | log.Error(err) |
| 415 | return nil, err |
| 416 | } else if f.be.opnConns == 0 { |
| 417 | err := fmt.Errorf("No open connections on backend '%s'", f.be.name) |
| 418 | log.Error(err) |
| 419 | return f.be, err |
| 420 | } |
| 421 | return f.be, nil |
| 422 | } |
| 423 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 424 | func (strms *beClStrms) getActive() *beClStrm { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 425 | strms.lck.Lock() |
| 426 | defer strms.lck.Unlock() |
| 427 | return strms.actvStrm |
| 428 | } |
| 429 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 430 | func (strms *beClStrms) setThenGetActive(strm *beClStrm) *beClStrm { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 431 | strms.lck.Lock() |
| 432 | defer strms.lck.Unlock() |
| 433 | if strms.actvStrm == nil { |
| 434 | strms.actvStrm = strm |
| 435 | } |
| 436 | return strms.actvStrm |
| 437 | } |
| 438 | |
| 439 | func (src *beClStrms) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error { |
| 440 | fc2s := func(srcS *beClStrm) { |
| 441 | for i := 0; ; i++ { |
| 442 | if err := srcS.strm.RecvMsg(f); err != nil { |
| 443 | if src.setThenGetActive(srcS) == srcS { |
| 444 | srcS.c2sRtrn <- err // this can be io.EOF which is the success case |
| 445 | } else { |
| 446 | srcS.c2sRtrn <- nil // Inactive responder |
| 447 | } |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 448 | close(srcS.ok2Close) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 449 | break |
| 450 | } |
| 451 | if src.setThenGetActive(srcS) != srcS { |
| 452 | srcS.c2sRtrn <- nil |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 453 | continue |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 454 | } |
| 455 | if i == 0 { |
| 456 | // This is a bit of a hack, but client to server headers are only readable after first client msg is |
| 457 | // received but must be written to server stream before the first msg is flushed. |
| 458 | // This is the only place to do it nicely. |
| 459 | md, err := srcS.strm.Header() |
| 460 | if err != nil { |
| 461 | srcS.c2sRtrn <- err |
| 462 | break |
| 463 | } |
| 464 | // Update the metadata for the response. |
| 465 | if f.metaKey != NoMeta { |
| 466 | if f.metaVal == "" { |
| 467 | // We could also alsways just do this |
| 468 | md.Set(f.metaKey, f.be.name) |
| 469 | } else { |
| 470 | md.Set(f.metaKey, f.metaVal) |
| 471 | } |
| 472 | } |
| 473 | if err := dst.SendHeader(md); err != nil { |
| 474 | srcS.c2sRtrn <- err |
| 475 | break |
| 476 | } |
| 477 | } |
| 478 | log.Debugf("Northbound frame %v", f.payload) |
| 479 | if err := dst.SendMsg(f); err != nil { |
| 480 | srcS.c2sRtrn <- err |
| 481 | break |
| 482 | } |
| 483 | } |
| 484 | } |
| 485 | |
| 486 | // There should be AT LEAST one open stream at this point |
| 487 | // if there isn't its a grave error in the code and it will |
| 488 | // cause this thread to block here so check for it and |
| 489 | // don't let the lock up happen but report the error |
| 490 | ret := make(chan error, 1) |
| 491 | agg := make(chan *beClStrm) |
| 492 | atLeastOne := false |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 493 | for _, strm := range src.strms { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 494 | if strm != nil { |
| 495 | go fc2s(strm) |
| 496 | go func(s *beClStrm) { // Wait on result and aggregate |
| 497 | r := <-s.c2sRtrn // got the return code |
| 498 | if r == nil { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 499 | return // We're the redundat stream, just die |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 500 | } |
| 501 | s.c2sRtrn <- r // put it back to pass it along |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 502 | agg <- s // send the stream to the aggregator |
| 503 | }(strm) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 504 | atLeastOne = true |
| 505 | } |
| 506 | } |
| 507 | if atLeastOne == true { |
| 508 | go func() { // Wait on aggregated result |
| 509 | s := <-agg |
| 510 | ret <- <-s.c2sRtrn |
| 511 | }() |
| 512 | } else { |
| 513 | err := errors.New("There are no open streams. Unable to forward message.") |
| 514 | log.Error(err) |
| 515 | ret <- err |
| 516 | } |
| 517 | return ret |
| 518 | } |
| 519 | |
| 520 | func (strms *beClStrms) sendAll(f *nbFrame) error { |
| 521 | var rtrn error |
| 522 | |
| 523 | atLeastOne := false |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 524 | for _, strm := range strms.srtdStrms { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 525 | if strm != nil { |
| 526 | if err := strm.strm.SendMsg(f); err != nil { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 527 | log.Debugf("Error on SendMsg: %s", err.Error()) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 528 | strm.s2cRtrn = err |
| 529 | } |
| 530 | atLeastOne = true |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 531 | } else { |
| 532 | log.Debugf("Nil stream") |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 533 | } |
| 534 | } |
| 535 | // If one of the streams succeeded, declare success |
| 536 | // if none did pick an error and return it. |
| 537 | if atLeastOne == true { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 538 | for _, strm := range strms.srtdStrms { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 539 | if strm != nil { |
| 540 | rtrn = strm.s2cRtrn |
| 541 | if rtrn == nil { |
| 542 | return rtrn |
| 543 | } |
| 544 | } |
| 545 | } |
| 546 | return rtrn |
| 547 | } else { |
| 548 | rtrn = errors.New("There are no open streams, this should never happen") |
| 549 | log.Error(rtrn) |
| 550 | } |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 551 | return rtrn |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 552 | } |
| 553 | |
| 554 | func (dst *beClStrms) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error { |
| 555 | ret := make(chan error, 1) |
| 556 | go func() { |
| 557 | // The frame buffer already has the results of a first |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 558 | // RecvMsg in it so the first thing to do is to |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 559 | // send it to the list of client streams and only |
| 560 | // then read some more. |
| 561 | for i := 0; ; i++ { |
| 562 | // Send the message to each of the backend streams |
| 563 | if err := dst.sendAll(f); err != nil { |
| 564 | ret <- err |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 565 | log.Debugf("SendAll failed %s", err.Error()) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 566 | break |
| 567 | } |
| 568 | log.Debugf("Southbound frame %v", f.payload) |
| 569 | if err := src.RecvMsg(f); err != nil { |
| 570 | ret <- err // this can be io.EOF which is happy case |
| 571 | break |
| 572 | } |
| 573 | } |
| 574 | }() |
| 575 | return ret |
| 576 | } |
| 577 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 578 | func (st *beClStrms) sortStreams() { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 579 | var tmpKeys []string |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 580 | for k, _ := range st.strms { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 581 | tmpKeys = append(tmpKeys, k) |
| 582 | } |
| 583 | sort.Strings(tmpKeys) |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 584 | for _, v := range tmpKeys { |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 585 | st.srtdStrms = append(st.srtdStrms, st.strms[v]) |
| 586 | } |
| 587 | } |
| 588 | |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 589 | func newBackend(conf *BackendConfig, clusterName string) (*backend, error) { |
| 590 | var rtrn_err bool = false |
| 591 | |
| 592 | log.Debugf("Configuring the backend with %v", *conf) |
| 593 | // Validate the conifg and configure the backend |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 594 | be := &backend{name: conf.Name, connections: make(map[string]*beConnection), opnConns: 0} |
| 595 | idx := strIndex([]string(beTypeNames), conf.Type) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 596 | if idx == 0 { |
| 597 | log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName) |
| 598 | rtrn_err = true |
| 599 | } |
| 600 | be.beType = idx |
| 601 | |
| 602 | idx = strIndex(asTypeNames, conf.Association.Strategy) |
| 603 | if idx == 0 && be.beType == BE_ACTIVE_ACTIVE { |
| 604 | log.Errorf("An association strategy must be provided if the backend "+ |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 605 | "type is active/active for backend %s in cluster %s", conf.Name, clusterName) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 606 | rtrn_err = true |
| 607 | } |
| 608 | be.activeAssoc.strategy = idx |
| 609 | |
| 610 | idx = strIndex(alTypeNames, conf.Association.Location) |
| 611 | if idx == 0 && be.beType == BE_ACTIVE_ACTIVE { |
| 612 | log.Errorf("An association location must be provided if the backend "+ |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 613 | "type is active/active for backend %s in cluster %s", conf.Name, clusterName) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 614 | rtrn_err = true |
| 615 | } |
| 616 | be.activeAssoc.location = idx |
| 617 | |
| 618 | if conf.Association.Field == "" && be.activeAssoc.location == AL_PROTOBUF { |
| 619 | log.Errorf("An association field must be provided if the backend "+ |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 620 | "type is active/active and the location is set to protobuf "+ |
| 621 | "for backend %s in cluster %s", conf.Name, clusterName) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 622 | rtrn_err = true |
| 623 | } |
| 624 | be.activeAssoc.field = conf.Association.Field |
sslobodr | 8e2ccb5 | 2019-02-05 09:21:47 -0500 | [diff] [blame] | 625 | |
| 626 | if conf.Association.Key == "" && be.activeAssoc.location == AL_HEADER { |
| 627 | log.Errorf("An association key must be provided if the backend "+ |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 628 | "type is active/active and the location is set to header "+ |
| 629 | "for backend %s in cluster %s", conf.Name, clusterName) |
sslobodr | 8e2ccb5 | 2019-02-05 09:21:47 -0500 | [diff] [blame] | 630 | rtrn_err = true |
| 631 | } |
| 632 | be.activeAssoc.key = conf.Association.Key |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 633 | if rtrn_err { |
| 634 | return nil, errors.New("Backend configuration failed") |
| 635 | } |
| 636 | // Configure the connections |
| 637 | // Connections can consist of just a name. This allows for dynamic configuration |
| 638 | // at a later time. |
| 639 | // TODO: validate that there is one connection for all but active/active backends |
sslobodr | 8e2ccb5 | 2019-02-05 09:21:47 -0500 | [diff] [blame] | 640 | if len(conf.Connections) > 1 && be.activeAssoc.strategy != BE_ACTIVE_ACTIVE { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 641 | log.Errorf("Only one connection must be specified if the association " + |
| 642 | "strategy is not set to 'active_active'") |
sslobodr | 8e2ccb5 | 2019-02-05 09:21:47 -0500 | [diff] [blame] | 643 | rtrn_err = true |
| 644 | } |
| 645 | if len(conf.Connections) == 0 { |
| 646 | log.Errorf("At least one connection must be specified") |
| 647 | rtrn_err = true |
| 648 | } |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 649 | for _, cnConf := range conf.Connections { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 650 | if cnConf.Name == "" { |
| 651 | log.Errorf("A connection must have a name for backend %s in cluster %s", |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 652 | conf.Name, clusterName) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 653 | } else { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 654 | gc := &gConnection{conn: nil, cncl: nil, state: connectivity.Idle} |
| 655 | be.connections[cnConf.Name] = &beConnection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, bknd: be, gConn: gc} |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 656 | if cnConf.Addr != "" { // This connection will be specified later. |
| 657 | if ip := net.ParseIP(cnConf.Addr); ip == nil { |
| 658 | log.Errorf("The IP address for connection %s in backend %s in cluster %s is invalid", |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 659 | cnConf.Name, conf.Name, clusterName) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 660 | rtrn_err = true |
| 661 | } |
| 662 | // Validate the port number. This just validtes that it's a non 0 integer |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 663 | if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 664 | log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid", |
| 665 | cnConf.Port, cnConf.Name, conf.Name, clusterName) |
| 666 | rtrn_err = true |
| 667 | } else { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 668 | if n <= 0 && n > 65535 { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 669 | log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid", |
| 670 | cnConf.Port, cnConf.Name, conf.Name, clusterName) |
| 671 | rtrn_err = true |
| 672 | } |
| 673 | } |
| 674 | } |
| 675 | } |
| 676 | } |
sslobodr | 63d160c | 2019-02-08 14:25:13 -0500 | [diff] [blame] | 677 | |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 678 | if rtrn_err { |
| 679 | return nil, errors.New("Connection configuration failed") |
| 680 | } |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 681 | // All is well start the backend cluster connections |
| 682 | be.connectAll() |
| 683 | |
| 684 | return be, nil |
| 685 | } |
| 686 | |
| 687 | //***************************************************************// |
| 688 | //********************* Backend Functions ***********************// |
| 689 | //***************************************************************// |
| 690 | |
| 691 | func (be *backend) incConn() { |
| 692 | be.lck.Lock() |
| 693 | defer be.lck.Unlock() |
| 694 | be.opnConns++ |
| 695 | } |
| 696 | |
| 697 | func (be *backend) decConn() { |
| 698 | be.lck.Lock() |
| 699 | defer be.lck.Unlock() |
| 700 | be.opnConns-- |
| 701 | if be.opnConns < 0 { |
| 702 | log.Error("Internal error, number of open connections less than 0") |
| 703 | be.opnConns = 0 |
| 704 | } |
| 705 | } |
| 706 | |
| 707 | // Attempts to establish all the connections for a backend |
| 708 | // any failures result in an abort. This should only be called |
| 709 | // on a first attempt to connect. Individual connections should be |
| 710 | // handled after that. |
| 711 | func (be *backend) connectAll() { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 712 | for _, cn := range be.connections { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 713 | cn.connect() |
| 714 | } |
| 715 | } |
| 716 | |
| 717 | func (cn *beConnection) connect() { |
| 718 | if cn.addr != "" && cn.getConn() == nil { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 719 | log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name, cn.addr, cn.port) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 720 | // Dial doesn't block, it just returns and continues connecting in the background. |
| 721 | // Check back later to confirm and increase the connection count. |
| 722 | ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection |
| 723 | cn.setCncl(cnclFnc) |
| 724 | if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 725 | log.Errorf("Dialng connection %v:%v", cn, err) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 726 | cn.waitAndTryAgain(ctx) |
| 727 | } else { |
| 728 | cn.setConn(conn) |
| 729 | log.Debugf("Starting the connection monitor for '%s'", cn.name) |
| 730 | cn.monitor(ctx) |
| 731 | } |
| 732 | } else if cn.addr == "" { |
| 733 | log.Infof("No address supplied for connection '%s', not connecting for now", cn.name) |
| 734 | } else { |
| 735 | log.Debugf("Connection '%s' is already connected, ignoring", cn.name) |
| 736 | } |
| 737 | } |
| 738 | |
| 739 | func (cn *beConnection) waitAndTryAgain(ctx context.Context) { |
| 740 | go func(ctx context.Context) { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 741 | ctxTm, cnclTm := context.WithTimeout(context.Background(), 10*time.Second) |
| 742 | select { |
| 743 | case <-ctxTm.Done(): |
| 744 | cnclTm() |
| 745 | log.Debugf("Trying to connect '%s'", cn.name) |
| 746 | // Connect creates a new context so cancel this one. |
| 747 | cn.cancel() |
| 748 | cn.connect() |
| 749 | return |
| 750 | case <-ctx.Done(): |
| 751 | cnclTm() |
| 752 | return |
| 753 | } |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 754 | }(ctx) |
| 755 | } |
| 756 | |
| 757 | func (cn *beConnection) cancel() { |
| 758 | cn.lck.Lock() |
| 759 | defer cn.lck.Unlock() |
| 760 | log.Debugf("Canceling connection %s", cn.name) |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 761 | if cn.gConn != nil { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 762 | if cn.gConn.cncl != nil { |
| 763 | cn.cncl() |
| 764 | } else { |
| 765 | log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name) |
| 766 | } |
| 767 | } else { |
| 768 | log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name) |
| 769 | } |
| 770 | } |
| 771 | |
| 772 | func (cn *beConnection) setCncl(cncl context.CancelFunc) { |
| 773 | cn.lck.Lock() |
| 774 | defer cn.lck.Unlock() |
| 775 | if cn.gConn != nil { |
| 776 | cn.gConn.cncl = cncl |
| 777 | } else { |
| 778 | log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name) |
| 779 | } |
| 780 | } |
| 781 | |
| 782 | func (cn *beConnection) setConn(conn *grpc.ClientConn) { |
| 783 | cn.lck.Lock() |
| 784 | defer cn.lck.Unlock() |
| 785 | if cn.gConn != nil { |
| 786 | cn.gConn.conn = conn |
| 787 | } else { |
| 788 | log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name) |
| 789 | } |
| 790 | } |
| 791 | |
| 792 | func (cn *beConnection) getConn() *grpc.ClientConn { |
| 793 | cn.lck.Lock() |
| 794 | defer cn.lck.Unlock() |
| 795 | if cn.gConn != nil { |
| 796 | return cn.gConn.conn |
| 797 | } |
| 798 | return nil |
| 799 | } |
| 800 | |
| 801 | func (cn *beConnection) close() { |
| 802 | cn.lck.Lock() |
| 803 | defer cn.lck.Unlock() |
| 804 | log.Debugf("Closing connection %s", cn.name) |
| 805 | if cn.gConn != nil && cn.gConn.conn != nil { |
| 806 | if cn.gConn.conn.GetState() == connectivity.Ready { |
| 807 | cn.bknd.decConn() // Decrease the connection reference |
| 808 | } |
| 809 | if cn.gConn.cncl != nil { |
| 810 | cn.gConn.cncl() // Cancel the context first to force monitor functions to exit |
| 811 | } else { |
| 812 | log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name) |
| 813 | } |
| 814 | cn.gConn.conn.Close() // Close the connection |
| 815 | // Now replace the gConn object with a new one as this one just |
| 816 | // fades away as references to it are released after the close |
| 817 | // finishes in the background. |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 818 | cn.gConn = &gConnection{conn: nil, cncl: nil, state: connectivity.TransientFailure} |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 819 | } else { |
| 820 | log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name) |
| 821 | } |
| 822 | |
| 823 | } |
| 824 | |
| 825 | func (cn *beConnection) setState(st connectivity.State) { |
| 826 | cn.lck.Lock() |
| 827 | defer cn.lck.Unlock() |
| 828 | if cn.gConn != nil { |
| 829 | cn.gConn.state = st |
| 830 | } else { |
| 831 | log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name) |
| 832 | } |
| 833 | } |
| 834 | |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 835 | func (cn *beConnection) getState() connectivity.State { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 836 | cn.lck.Lock() |
| 837 | defer cn.lck.Unlock() |
| 838 | if cn.gConn != nil { |
| 839 | if cn.gConn.conn != nil { |
| 840 | return cn.gConn.conn.GetState() |
| 841 | } else { |
| 842 | log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name) |
| 843 | } |
| 844 | } else { |
| 845 | log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name) |
| 846 | } |
| 847 | // For lack of a better state to use. The logs will help determine what happened here. |
| 848 | return connectivity.TransientFailure |
| 849 | } |
| 850 | |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 851 | func (cn *beConnection) monitor(ctx context.Context) { |
| 852 | bp := cn.bknd |
| 853 | log.Debugf("Setting up monitoring for backend %s", bp.name) |
| 854 | go func(ctx context.Context) { |
| 855 | var delay time.Duration = 100 //ms |
| 856 | for { |
| 857 | //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, bp.name, cn.conn) |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 858 | if cn.getState() == connectivity.Ready { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 859 | log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, bp.name) |
| 860 | cn.setState(connectivity.Ready) |
| 861 | bp.incConn() |
| 862 | if cn.getConn() != nil && cn.getConn().WaitForStateChange(ctx, connectivity.Ready) == false { |
| 863 | // The context was canceled. This is done by the close function |
| 864 | // so just exit the routine |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 865 | log.Debugf("Contxt canceled for connection '%s' on backend '%s'", cn.name, bp.name) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 866 | return |
| 867 | } |
| 868 | if cs := cn.getConn(); cs != nil { |
| 869 | switch cs := cn.getState(); cs { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 870 | case connectivity.TransientFailure: |
| 871 | cn.setState(cs) |
| 872 | bp.decConn() |
| 873 | log.Infof("Transient failure for connection '%s' on backend '%s'", cn.name, bp.name) |
| 874 | delay = 100 |
| 875 | case connectivity.Shutdown: |
| 876 | //The connection was closed. The assumption here is that the closer |
| 877 | // will manage the connection count and setting the conn to nil. |
| 878 | // Exit the routine |
| 879 | log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, bp.name) |
| 880 | return |
| 881 | case connectivity.Idle: |
| 882 | // This can only happen if the server sends a GoAway. This can |
| 883 | // only happen if the server has modified MaxConnectionIdle from |
| 884 | // its default of infinity. The only solution here is to close the |
| 885 | // connection and keepTrying()? |
| 886 | //TODO: Read the grpc source code to see if there's a different approach |
| 887 | log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, bp.name) |
| 888 | cn.close() |
| 889 | cn.connect() |
| 890 | return |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 891 | } |
| 892 | } else { // A nil means something went horribly wrong, error and exit. |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 893 | log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s", cn.name) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 894 | return |
| 895 | } |
| 896 | } else { |
| 897 | log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, bp.name) |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 898 | ctxTm, cnclTm := context.WithTimeout(context.Background(), delay*time.Millisecond) |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 899 | if delay < 30000 { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 900 | delay += delay |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 901 | } |
| 902 | select { |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 903 | case <-ctxTm.Done(): |
| 904 | cnclTm() // Doubt this is required but it's harmless. |
| 905 | // Do nothing but let the loop continue |
| 906 | case <-ctx.Done(): |
| 907 | // Context was closed, close and exit routine |
| 908 | //cn.close() NO! let the close be managed externally! |
| 909 | return |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 910 | } |
| 911 | } |
| 912 | } |
| 913 | }(ctx) |
| 914 | } |
| 915 | |
| 916 | // Set a callback for connection failure notification |
| 917 | // This is currently not used. |
Kent Hagerman | 0ab4cb2 | 2019-04-24 13:13:35 -0400 | [diff] [blame] | 918 | func (bp *backend) setConnFailCallback(cb func(string, *backend) bool) { |
sslobodr | 392ebd5 | 2019-01-18 12:41:49 -0500 | [diff] [blame] | 919 | bp.connFailCallback = cb |
| 920 | } |