blob: 863652f4d20d17e5a99da78e948c5017835763f7 [file] [log] [blame]
sslobodr392ebd52019-01-18 12:41:49 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16// gRPC affinity router with active/active backends
17
18package afrouter
19
20// Backend manager handles redundant connections per backend
21
22import (
sslobodr392ebd52019-01-18 12:41:49 -050023 "errors"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040024 "fmt"
25 "github.com/opencord/voltha-go/common/log"
sslobodr392ebd52019-01-18 12:41:49 -050026 "golang.org/x/net/context"
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/codes"
sslobodr392ebd52019-01-18 12:41:49 -050029 "google.golang.org/grpc/connectivity"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040030 "google.golang.org/grpc/metadata"
31 "io"
32 "net"
33 "sort"
34 "strconv"
35 "strings"
36 "sync"
37 "time"
sslobodr392ebd52019-01-18 12:41:49 -050038)
39
sslobodr392ebd52019-01-18 12:41:49 -050040const (
41 BE_ACTIVE_ACTIVE = 1 // Backend type active/active
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040042 BE_SERVER = 2 // Backend type single server
43 BE_SEQ_RR = 0 // Backend sequence round robin
44 AS_NONE = 0 // Association strategy: none
45 AS_SERIAL_NO = 1 // Association strategy: serial number
46 AL_NONE = 0 // Association location: none
47 AL_HEADER = 1 // Association location: header
48 AL_PROTOBUF = 2 // Association location: protobuf
sslobodr392ebd52019-01-18 12:41:49 -050049)
50
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040051var beTypeNames = []string{"", "active_active", "server"}
52var asTypeNames = []string{"", "serial_number"}
53var alTypeNames = []string{"", "header", "protobuf"}
sslobodr392ebd52019-01-18 12:41:49 -050054
55var bClusters map[string]*backendCluster = make(map[string]*backendCluster)
56
57type backendCluster struct {
58 name string
59 //backends map[string]*backend
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040060 backends []*backend
61 beRvMap map[*backend]int
sslobodr392ebd52019-01-18 12:41:49 -050062 serialNoSource chan uint64
63}
64
65type backend struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040066 lck sync.Mutex
67 name string
68 beType int
69 activeAssoc assoc
70 connFailCallback func(string, *backend) bool
71 connections map[string]*beConnection
72 srtdConns []*beConnection
73 opnConns int
sslobodr392ebd52019-01-18 12:41:49 -050074}
75
76type assoc struct {
77 strategy int
78 location int
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040079 field string // Used only if location is protobuf
80 key string
sslobodr392ebd52019-01-18 12:41:49 -050081}
82
83type beConnection struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040084 lck sync.Mutex
85 cncl context.CancelFunc
86 name string
87 addr string
88 port string
sslobodr392ebd52019-01-18 12:41:49 -050089 gConn *gConnection
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040090 bknd *backend
sslobodr392ebd52019-01-18 12:41:49 -050091}
92
93// This structure should never be referred to
94// by any routine outside of *beConnection
95// routines.
96type gConnection struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040097 lck sync.Mutex
sslobodr392ebd52019-01-18 12:41:49 -050098 state connectivity.State
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040099 conn *grpc.ClientConn
100 cncl context.CancelFunc
sslobodr392ebd52019-01-18 12:41:49 -0500101}
102
103type beClStrm struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400104 strm grpc.ClientStream
105 ctxt context.Context
106 cncl context.CancelFunc
sslobodr63d160c2019-02-08 14:25:13 -0500107 ok2Close chan struct{}
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400108 c2sRtrn chan error
109 s2cRtrn error
sslobodr392ebd52019-01-18 12:41:49 -0500110}
111
112type beClStrms struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400113 lck sync.Mutex
114 actvStrm *beClStrm
115 strms map[string]*beClStrm
sslobodr63d160c2019-02-08 14:25:13 -0500116 srtdStrms []*beClStrm
sslobodr392ebd52019-01-18 12:41:49 -0500117}
118
119//***************************************************************//
120//****************** BackendCluster Functions *******************//
121//***************************************************************//
122
123//TODO: Move the backend type (active/active etc) to the cluster
124// level. All backends should really be of the same type.
125// Create a new backend cluster
sslobodrcd37bc52019-01-24 11:47:16 -0500126func newBackendCluster(conf *BackendClusterConfig) (*backendCluster, error) {
sslobodr392ebd52019-01-18 12:41:49 -0500127 var err error = nil
128 var rtrn_err bool = false
129 var be *backend
sslobodr8e2ccb52019-02-05 09:21:47 -0500130 log.Debugf("Creating a backend cluster with %v", conf)
sslobodr392ebd52019-01-18 12:41:49 -0500131 // Validate the configuration
132 if conf.Name == "" {
133 log.Error("A backend cluster must have a name")
134 rtrn_err = true
135 }
136 //bc := &backendCluster{name:conf.Name,backends:make(map[string]*backend)}
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400137 bc := &backendCluster{name: conf.Name, beRvMap: make(map[*backend]int)}
sslobodr392ebd52019-01-18 12:41:49 -0500138 bClusters[bc.name] = bc
139 bc.startSerialNumberSource() // Serial numberere for active/active backends
140 idx := 0
sslobodr5f0b5a32019-01-24 07:45:19 -0500141 for _, bec := range conf.Backends {
sslobodr392ebd52019-01-18 12:41:49 -0500142 if bec.Name == "" {
143 log.Errorf("A backend must have a name in cluster %s\n", conf.Name)
144 rtrn_err = true
145 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400146 if be, err = newBackend(&bec, conf.Name); err != nil {
sslobodr392ebd52019-01-18 12:41:49 -0500147 log.Errorf("Error creating backend %s", bec.Name)
148 rtrn_err = true
149 }
150 bc.backends = append(bc.backends, be)
151 bc.beRvMap[bc.backends[idx]] = idx
152 idx++
153 }
154 if rtrn_err {
155 return nil, errors.New("Error creating backend(s)")
156 }
157 return bc, nil
158}
159
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400160func (bc *backendCluster) getBackend(name string) *backend {
161 for _, v := range bc.backends {
sslobodr392ebd52019-01-18 12:41:49 -0500162 if v.name == name {
163 return v
164 }
165 }
166 return nil
167}
168
169func (bc *backendCluster) startSerialNumberSource() {
170 bc.serialNoSource = make(chan uint64)
171 var counter uint64 = 0
172 // This go routine only dies on exit, it is not a leak
173 go func() {
174 for {
175 bc.serialNoSource <- counter
176 counter++
177 }
178 }()
179}
180
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400181func (bc *backendCluster) nextBackend(be *backend, seq int) (*backend, error) {
sslobodr392ebd52019-01-18 12:41:49 -0500182 switch seq {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400183 case BE_SEQ_RR: // Round robin
184 in := be
185 // If no backend is found having a connection
186 // then return nil.
187 if be == nil {
188 log.Debug("Previous backend is nil")
189 be = bc.backends[0]
190 in = be
191 if be.opnConns != 0 {
192 return be, nil
sslobodr392ebd52019-01-18 12:41:49 -0500193 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400194 }
195 for {
196 log.Debugf("Requesting a new backend starting from %s", be.name)
197 cur := bc.beRvMap[be]
198 cur++
199 if cur >= len(bc.backends) {
200 cur = 0
sslobodr392ebd52019-01-18 12:41:49 -0500201 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400202 log.Debugf("Next backend is %d:%s", cur, bc.backends[cur].name)
203 if bc.backends[cur].opnConns > 0 {
204 return bc.backends[cur], nil
205 }
206 if bc.backends[cur] == in {
207 err := fmt.Errorf("No backend with open connections found")
208 log.Debug(err)
209 return nil, err
210 }
211 be = bc.backends[cur]
212 log.Debugf("Backend '%s' has no open connections, trying next", bc.backends[cur].name)
213 }
214 default: // Invalid, defalt to routnd robin
215 log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq)
216 return bc.nextBackend(be, BE_SEQ_RR)
sslobodr392ebd52019-01-18 12:41:49 -0500217 }
218}
219
220func (bec *backendCluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, mthdSlice []string,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400221 mk string, mv string) error {
222 //func (bec *backendCluster) handler(nbR * nbRequest) error {
sslobodr392ebd52019-01-18 12:41:49 -0500223
224 // The final backend cluster needs to be determined here. With non-affinity routed backends it could
225 // just be determined here and for affinity routed backends the first message must be received
226 // before the backend is determined. In order to keep things simple, the same approach is taken for
227 // now.
228
229 // Get the backend to use.
230 // Allocate the nbFrame here since it holds the "context" of this communication
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400231 nf := &nbFrame{router: r, mthdSlice: mthdSlice, serNo: bec.serialNoSource, metaKey: mk, metaVal: mv}
sslobodr392ebd52019-01-18 12:41:49 -0500232 log.Debugf("Nb frame allocate with method %s", nf.mthdSlice[REQ_METHOD])
233
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400234 if be, err := bec.assignBackend(serverStream, nf); err != nil {
sslobodr392ebd52019-01-18 12:41:49 -0500235 // At this point, no backend streams have been initiated
236 // so just return the error.
237 return err
238 } else {
239 log.Debugf("Backend '%s' selected", be.name)
240 // Allocate a sbFrame here because it might be needed for return value intercept
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400241 sf := &sbFrame{router: r, be: be, method: nf.mthdSlice[REQ_METHOD], metaKey: mk, metaVal: mv}
242 log.Debugf("Sb frame allocated with router %s", r.Name())
sslobodr392ebd52019-01-18 12:41:49 -0500243 return be.handler(srv, serverStream, nf, sf)
244 }
245}
246
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400247func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f *nbFrame) (*beClStrms, error) {
sslobodr392ebd52019-01-18 12:41:49 -0500248
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400249 rtrn := &beClStrms{strms: make(map[string]*beClStrm), actvStrm: nil}
sslobodr392ebd52019-01-18 12:41:49 -0500250
251 log.Debugf("Opening southbound streams for method '%s'", f.mthdSlice[REQ_METHOD])
252 // Get the metadata from the incoming message on the server
253 md, ok := metadata.FromIncomingContext(serverStream.Context())
254 if !ok {
255 return nil, errors.New("Could not get a server stream metadata")
256 }
257
258 // TODO: Need to check if this is an active/active backend cluster
259 // with a serial number in the header.
260 serialNo := <-f.serNo
261 log.Debugf("Serial number for transaction allocated: %d", serialNo)
262 // If even one stream can be created then proceed. If none can be
263 // created then report an error becase both the primary and redundant
264 // connections are non-existant.
265 var atLeastOne bool = false
266 var errStr strings.Builder
267 log.Debugf("There are %d connections to open", len(be.connections))
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400268 for _, cn := range be.srtdConns {
sslobodr392ebd52019-01-18 12:41:49 -0500269 // TODO: THIS IS A HACK to suspend redundancy for binding routers for all calls
270 // and its very specific to a use case. There should really be a per method
271 // mechanism to select non-redundant calls for all router types. This needs
272 // to be fixed ASAP. The overrides should be used for this, the implementation
273 // is simple, and it can be done here.
274 if atLeastOne == true && f.metaKey != NoMeta {
275 // Don't open any more southbound streams
276 log.Debugf("Not opening any more SB streams, metaKey = %s", f.metaKey)
sslobodr63d160c2019-02-08 14:25:13 -0500277 rtrn.strms[cn.name] = nil
sslobodr392ebd52019-01-18 12:41:49 -0500278 continue
279 }
280 // Copy in the metadata
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400281 if cn.getState() == connectivity.Ready && cn.getConn() != nil {
sslobodr63d160c2019-02-08 14:25:13 -0500282 log.Debugf("Opening southbound stream for connection '%s'", cn.name)
sslobodr392ebd52019-01-18 12:41:49 -0500283 // Create an outgoing context that includes the incoming metadata
284 // and that will cancel if the server's context is canceled
285 clientCtx, clientCancel := context.WithCancel(serverStream.Context())
286 clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy())
287 //TODO: Same check here, only add the serial number if necessary
288 clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400289 strconv.FormatUint(serialNo, 10))
sslobodr392ebd52019-01-18 12:41:49 -0500290 // Create the client stream
291 if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400292 cn.getConn(), f.mthdSlice[REQ_ALL]); err != nil {
293 log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
sslobodr392ebd52019-01-18 12:41:49 -0500294 fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
sslobodr63d160c2019-02-08 14:25:13 -0500295 rtrn.strms[cn.name] = nil
sslobodr392ebd52019-01-18 12:41:49 -0500296 } else {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400297 rtrn.strms[cn.name] = &beClStrm{strm: clientStream, ctxt: clientCtx,
298 cncl: clientCancel, s2cRtrn: nil,
299 ok2Close: make(chan struct{}),
300 c2sRtrn: make(chan error, 1)}
sslobodr392ebd52019-01-18 12:41:49 -0500301 atLeastOne = true
302 }
303 } else if cn.getConn() == nil {
304 err := errors.New(fmt.Sprintf("Connection '%s' is closed", cn.name))
305 fmt.Fprint(&errStr, err.Error())
306 log.Debug(err)
307 } else {
308 err := errors.New(fmt.Sprintf("Connection '%s' isn't ready", cn.name))
309 fmt.Fprint(&errStr, err.Error())
310 log.Debug(err)
311 }
312 }
313 if atLeastOne == true {
sslobodr63d160c2019-02-08 14:25:13 -0500314 rtrn.sortStreams()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400315 return rtrn, nil
sslobodr392ebd52019-01-18 12:41:49 -0500316 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400317 fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ", be.name)
sslobodr392ebd52019-01-18 12:41:49 -0500318 log.Error(errStr.String())
319 return nil, errors.New(errStr.String())
320}
321
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400322func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *nbFrame, sf *sbFrame) error {
sslobodr392ebd52019-01-18 12:41:49 -0500323
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400324 // Set up and launch each individual southbound stream
sslobodr392ebd52019-01-18 12:41:49 -0500325 var beStrms *beClStrms
sslobodr63d160c2019-02-08 14:25:13 -0500326 var rtrn error = nil
327 var s2cOk bool = false
328 var c2sOk bool = false
sslobodr392ebd52019-01-18 12:41:49 -0500329
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400330 beStrms, err := be.openSouthboundStreams(srv, serverStream, nf)
sslobodr392ebd52019-01-18 12:41:49 -0500331 if err != nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400332 log.Errorf("openStreams failed: %v", err)
sslobodr392ebd52019-01-18 12:41:49 -0500333 return err
334 }
335 // If we get here, there has to be AT LEAST ONE open stream
336
337 // *Do not explicitly close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
338 // Channels do not have to be closed, it is just a control flow mechanism, see
339 // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
340
341 log.Debug("Starting server to client forwarding")
342 s2cErrChan := beStrms.forwardServerToClient(serverStream, nf)
343
344 log.Debug("Starting client to server forwarding")
345 c2sErrChan := beStrms.forwardClientToServer(serverStream, sf)
346
347 // We don't know which side is going to stop sending first, so we need a select between the two.
348 for i := 0; i < 2; i++ {
349 select {
350 case s2cErr := <-s2cErrChan:
sslobodr63d160c2019-02-08 14:25:13 -0500351 s2cOk = true
sslobodr392ebd52019-01-18 12:41:49 -0500352 log.Debug("Processing s2cErr")
353 if s2cErr == io.EOF {
354 log.Debug("s2cErr reporting EOF")
355 // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./
356 // the clientStream>serverStream may continue sending though.
sslobodr63d160c2019-02-08 14:25:13 -0500357 defer beStrms.closeSend()
358 if c2sOk == true {
359 return rtrn
360 }
sslobodr392ebd52019-01-18 12:41:49 -0500361 } else {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400362 log.Debugf("s2cErr reporting %v", s2cErr)
sslobodr392ebd52019-01-18 12:41:49 -0500363 // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
364 // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
365 // exit with an error to the stack
366 beStrms.clientCancel()
367 return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
368 }
369 case c2sErr := <-c2sErrChan:
sslobodr63d160c2019-02-08 14:25:13 -0500370 c2sOk = true
sslobodr392ebd52019-01-18 12:41:49 -0500371 log.Debug("Processing c2sErr")
372 // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
373 // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
374 // will be nil.
375 serverStream.SetTrailer(beStrms.trailer())
376 // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
sslobodr63d160c2019-02-08 14:25:13 -0500377 // NOTE!!! with redundant backends, it's likely that one of the backends
378 // returns a response before all the data has been sent southbound and
379 // the southbound streams are closed. Should this happen one of the
380 // backends may not get the request.
sslobodr392ebd52019-01-18 12:41:49 -0500381 if c2sErr != io.EOF {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400382 rtrn = c2sErr
sslobodr392ebd52019-01-18 12:41:49 -0500383 }
sslobodr63d160c2019-02-08 14:25:13 -0500384 log.Debug("c2sErr reporting EOF")
385 if s2cOk == true {
386 return rtrn
387 }
sslobodr392ebd52019-01-18 12:41:49 -0500388 }
389 }
390 return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
391}
392
393func (strms *beClStrms) clientCancel() {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400394 for _, strm := range strms.strms {
sslobodr392ebd52019-01-18 12:41:49 -0500395 if strm != nil {
396 strm.cncl()
397 }
398 }
399}
400
401func (strms *beClStrms) closeSend() {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400402 for _, strm := range strms.strms {
sslobodr392ebd52019-01-18 12:41:49 -0500403 if strm != nil {
sslobodr63d160c2019-02-08 14:25:13 -0500404 <-strm.ok2Close
sslobodr392ebd52019-01-18 12:41:49 -0500405 log.Debug("Closing southbound stream")
406 strm.strm.CloseSend()
407 }
408 }
409}
410
411func (strms *beClStrms) trailer() metadata.MD {
412 return strms.actvStrm.strm.Trailer()
413}
414
415func (bec *backendCluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) {
416 // Receive the first message from the server. This calls the assigned codec in which
417 // Unmarshal gets executed. That will use the assigned router to select a backend
418 // and add it to the frame
419 if err := src.RecvMsg(f); err != nil {
420 return nil, err
421 }
422 // Check that the backend was routable and actually has connections open.
423 // If it doesn't then return a nil backend to indicate this
424 if f.be == nil {
425 err := fmt.Errorf("Unable to route method '%s'", f.mthdSlice[REQ_METHOD])
426 log.Error(err)
427 return nil, err
428 } else if f.be.opnConns == 0 {
429 err := fmt.Errorf("No open connections on backend '%s'", f.be.name)
430 log.Error(err)
431 return f.be, err
432 }
433 return f.be, nil
434}
435
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400436func (strms *beClStrms) getActive() *beClStrm {
sslobodr392ebd52019-01-18 12:41:49 -0500437 strms.lck.Lock()
438 defer strms.lck.Unlock()
439 return strms.actvStrm
440}
441
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400442func (strms *beClStrms) setThenGetActive(strm *beClStrm) *beClStrm {
sslobodr392ebd52019-01-18 12:41:49 -0500443 strms.lck.Lock()
444 defer strms.lck.Unlock()
445 if strms.actvStrm == nil {
446 strms.actvStrm = strm
447 }
448 return strms.actvStrm
449}
450
451func (src *beClStrms) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error {
452 fc2s := func(srcS *beClStrm) {
453 for i := 0; ; i++ {
454 if err := srcS.strm.RecvMsg(f); err != nil {
455 if src.setThenGetActive(srcS) == srcS {
456 srcS.c2sRtrn <- err // this can be io.EOF which is the success case
457 } else {
458 srcS.c2sRtrn <- nil // Inactive responder
459 }
sslobodr63d160c2019-02-08 14:25:13 -0500460 close(srcS.ok2Close)
sslobodr392ebd52019-01-18 12:41:49 -0500461 break
462 }
463 if src.setThenGetActive(srcS) != srcS {
464 srcS.c2sRtrn <- nil
sslobodr63d160c2019-02-08 14:25:13 -0500465 continue
sslobodr392ebd52019-01-18 12:41:49 -0500466 }
467 if i == 0 {
468 // This is a bit of a hack, but client to server headers are only readable after first client msg is
469 // received but must be written to server stream before the first msg is flushed.
470 // This is the only place to do it nicely.
471 md, err := srcS.strm.Header()
472 if err != nil {
473 srcS.c2sRtrn <- err
474 break
475 }
476 // Update the metadata for the response.
477 if f.metaKey != NoMeta {
478 if f.metaVal == "" {
479 // We could also alsways just do this
480 md.Set(f.metaKey, f.be.name)
481 } else {
482 md.Set(f.metaKey, f.metaVal)
483 }
484 }
485 if err := dst.SendHeader(md); err != nil {
486 srcS.c2sRtrn <- err
487 break
488 }
489 }
490 log.Debugf("Northbound frame %v", f.payload)
491 if err := dst.SendMsg(f); err != nil {
492 srcS.c2sRtrn <- err
493 break
494 }
495 }
496 }
497
498 // There should be AT LEAST one open stream at this point
499 // if there isn't its a grave error in the code and it will
500 // cause this thread to block here so check for it and
501 // don't let the lock up happen but report the error
502 ret := make(chan error, 1)
503 agg := make(chan *beClStrm)
504 atLeastOne := false
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400505 for _, strm := range src.strms {
sslobodr392ebd52019-01-18 12:41:49 -0500506 if strm != nil {
507 go fc2s(strm)
508 go func(s *beClStrm) { // Wait on result and aggregate
509 r := <-s.c2sRtrn // got the return code
510 if r == nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400511 return // We're the redundat stream, just die
sslobodr392ebd52019-01-18 12:41:49 -0500512 }
513 s.c2sRtrn <- r // put it back to pass it along
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400514 agg <- s // send the stream to the aggregator
515 }(strm)
sslobodr392ebd52019-01-18 12:41:49 -0500516 atLeastOne = true
517 }
518 }
519 if atLeastOne == true {
520 go func() { // Wait on aggregated result
521 s := <-agg
522 ret <- <-s.c2sRtrn
523 }()
524 } else {
525 err := errors.New("There are no open streams. Unable to forward message.")
526 log.Error(err)
527 ret <- err
528 }
529 return ret
530}
531
532func (strms *beClStrms) sendAll(f *nbFrame) error {
533 var rtrn error
534
535 atLeastOne := false
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400536 for _, strm := range strms.srtdStrms {
sslobodr392ebd52019-01-18 12:41:49 -0500537 if strm != nil {
538 if err := strm.strm.SendMsg(f); err != nil {
sslobodr63d160c2019-02-08 14:25:13 -0500539 log.Debugf("Error on SendMsg: %s", err.Error())
sslobodr392ebd52019-01-18 12:41:49 -0500540 strm.s2cRtrn = err
541 }
542 atLeastOne = true
sslobodr63d160c2019-02-08 14:25:13 -0500543 } else {
544 log.Debugf("Nil stream")
sslobodr392ebd52019-01-18 12:41:49 -0500545 }
546 }
547 // If one of the streams succeeded, declare success
548 // if none did pick an error and return it.
549 if atLeastOne == true {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400550 for _, strm := range strms.srtdStrms {
sslobodr392ebd52019-01-18 12:41:49 -0500551 if strm != nil {
552 rtrn = strm.s2cRtrn
553 if rtrn == nil {
554 return rtrn
555 }
556 }
557 }
558 return rtrn
559 } else {
560 rtrn = errors.New("There are no open streams, this should never happen")
561 log.Error(rtrn)
562 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400563 return rtrn
sslobodr392ebd52019-01-18 12:41:49 -0500564}
565
566func (dst *beClStrms) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
567 ret := make(chan error, 1)
568 go func() {
569 // The frame buffer already has the results of a first
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400570 // RecvMsg in it so the first thing to do is to
sslobodr392ebd52019-01-18 12:41:49 -0500571 // send it to the list of client streams and only
572 // then read some more.
573 for i := 0; ; i++ {
574 // Send the message to each of the backend streams
575 if err := dst.sendAll(f); err != nil {
576 ret <- err
sslobodr63d160c2019-02-08 14:25:13 -0500577 log.Debugf("SendAll failed %s", err.Error())
sslobodr392ebd52019-01-18 12:41:49 -0500578 break
579 }
580 log.Debugf("Southbound frame %v", f.payload)
581 if err := src.RecvMsg(f); err != nil {
582 ret <- err // this can be io.EOF which is happy case
583 break
584 }
585 }
586 }()
587 return ret
588}
589
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400590func (st *beClStrms) sortStreams() {
sslobodr63d160c2019-02-08 14:25:13 -0500591 var tmpKeys []string
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400592 for k, _ := range st.strms {
sslobodr63d160c2019-02-08 14:25:13 -0500593 tmpKeys = append(tmpKeys, k)
594 }
595 sort.Strings(tmpKeys)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400596 for _, v := range tmpKeys {
sslobodr63d160c2019-02-08 14:25:13 -0500597 st.srtdStrms = append(st.srtdStrms, st.strms[v])
598 }
599}
600
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400601func (be *backend) sortConns() {
sslobodr63d160c2019-02-08 14:25:13 -0500602 var tmpKeys []string
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400603 for k, _ := range be.connections {
sslobodr63d160c2019-02-08 14:25:13 -0500604 tmpKeys = append(tmpKeys, k)
605 }
606 sort.Strings(tmpKeys)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400607 for _, v := range tmpKeys {
sslobodr63d160c2019-02-08 14:25:13 -0500608 be.srtdConns = append(be.srtdConns, be.connections[v])
609 }
610}
611
sslobodr392ebd52019-01-18 12:41:49 -0500612func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
613 var rtrn_err bool = false
614
615 log.Debugf("Configuring the backend with %v", *conf)
616 // Validate the conifg and configure the backend
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400617 be := &backend{name: conf.Name, connections: make(map[string]*beConnection), opnConns: 0}
618 idx := strIndex([]string(beTypeNames), conf.Type)
sslobodr392ebd52019-01-18 12:41:49 -0500619 if idx == 0 {
620 log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
621 rtrn_err = true
622 }
623 be.beType = idx
624
625 idx = strIndex(asTypeNames, conf.Association.Strategy)
626 if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
627 log.Errorf("An association strategy must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400628 "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500629 rtrn_err = true
630 }
631 be.activeAssoc.strategy = idx
632
633 idx = strIndex(alTypeNames, conf.Association.Location)
634 if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
635 log.Errorf("An association location must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400636 "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500637 rtrn_err = true
638 }
639 be.activeAssoc.location = idx
640
641 if conf.Association.Field == "" && be.activeAssoc.location == AL_PROTOBUF {
642 log.Errorf("An association field must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400643 "type is active/active and the location is set to protobuf "+
644 "for backend %s in cluster %s", conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500645 rtrn_err = true
646 }
647 be.activeAssoc.field = conf.Association.Field
sslobodr8e2ccb52019-02-05 09:21:47 -0500648
649 if conf.Association.Key == "" && be.activeAssoc.location == AL_HEADER {
650 log.Errorf("An association key must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400651 "type is active/active and the location is set to header "+
652 "for backend %s in cluster %s", conf.Name, clusterName)
sslobodr8e2ccb52019-02-05 09:21:47 -0500653 rtrn_err = true
654 }
655 be.activeAssoc.key = conf.Association.Key
sslobodr392ebd52019-01-18 12:41:49 -0500656 if rtrn_err {
657 return nil, errors.New("Backend configuration failed")
658 }
659 // Configure the connections
660 // Connections can consist of just a name. This allows for dynamic configuration
661 // at a later time.
662 // TODO: validate that there is one connection for all but active/active backends
sslobodr8e2ccb52019-02-05 09:21:47 -0500663 if len(conf.Connections) > 1 && be.activeAssoc.strategy != BE_ACTIVE_ACTIVE {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400664 log.Errorf("Only one connection must be specified if the association " +
665 "strategy is not set to 'active_active'")
sslobodr8e2ccb52019-02-05 09:21:47 -0500666 rtrn_err = true
667 }
668 if len(conf.Connections) == 0 {
669 log.Errorf("At least one connection must be specified")
670 rtrn_err = true
671 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400672 for _, cnConf := range conf.Connections {
sslobodr392ebd52019-01-18 12:41:49 -0500673 if cnConf.Name == "" {
674 log.Errorf("A connection must have a name for backend %s in cluster %s",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400675 conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500676 } else {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400677 gc := &gConnection{conn: nil, cncl: nil, state: connectivity.Idle}
678 be.connections[cnConf.Name] = &beConnection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, bknd: be, gConn: gc}
sslobodr392ebd52019-01-18 12:41:49 -0500679 if cnConf.Addr != "" { // This connection will be specified later.
680 if ip := net.ParseIP(cnConf.Addr); ip == nil {
681 log.Errorf("The IP address for connection %s in backend %s in cluster %s is invalid",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400682 cnConf.Name, conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500683 rtrn_err = true
684 }
685 // Validate the port number. This just validtes that it's a non 0 integer
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400686 if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
sslobodr392ebd52019-01-18 12:41:49 -0500687 log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
688 cnConf.Port, cnConf.Name, conf.Name, clusterName)
689 rtrn_err = true
690 } else {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400691 if n <= 0 && n > 65535 {
sslobodr392ebd52019-01-18 12:41:49 -0500692 log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
693 cnConf.Port, cnConf.Name, conf.Name, clusterName)
694 rtrn_err = true
695 }
696 }
697 }
698 }
699 }
sslobodr63d160c2019-02-08 14:25:13 -0500700
sslobodr392ebd52019-01-18 12:41:49 -0500701 if rtrn_err {
702 return nil, errors.New("Connection configuration failed")
703 }
sslobodr63d160c2019-02-08 14:25:13 -0500704 // Create the sorted connection list for deterministic
705 // active-active call orders.
706 be.sortConns()
sslobodr392ebd52019-01-18 12:41:49 -0500707 // All is well start the backend cluster connections
708 be.connectAll()
709
710 return be, nil
711}
712
713//***************************************************************//
714//********************* Backend Functions ***********************//
715//***************************************************************//
716
717func (be *backend) incConn() {
718 be.lck.Lock()
719 defer be.lck.Unlock()
720 be.opnConns++
721}
722
723func (be *backend) decConn() {
724 be.lck.Lock()
725 defer be.lck.Unlock()
726 be.opnConns--
727 if be.opnConns < 0 {
728 log.Error("Internal error, number of open connections less than 0")
729 be.opnConns = 0
730 }
731}
732
733// Attempts to establish all the connections for a backend
734// any failures result in an abort. This should only be called
735// on a first attempt to connect. Individual connections should be
736// handled after that.
737func (be *backend) connectAll() {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400738 for _, cn := range be.connections {
sslobodr392ebd52019-01-18 12:41:49 -0500739 cn.connect()
740 }
741}
742
743func (cn *beConnection) connect() {
744 if cn.addr != "" && cn.getConn() == nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400745 log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name, cn.addr, cn.port)
sslobodr392ebd52019-01-18 12:41:49 -0500746 // Dial doesn't block, it just returns and continues connecting in the background.
747 // Check back later to confirm and increase the connection count.
748 ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection
749 cn.setCncl(cnclFnc)
750 if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400751 log.Errorf("Dialng connection %v:%v", cn, err)
sslobodr392ebd52019-01-18 12:41:49 -0500752 cn.waitAndTryAgain(ctx)
753 } else {
754 cn.setConn(conn)
755 log.Debugf("Starting the connection monitor for '%s'", cn.name)
756 cn.monitor(ctx)
757 }
758 } else if cn.addr == "" {
759 log.Infof("No address supplied for connection '%s', not connecting for now", cn.name)
760 } else {
761 log.Debugf("Connection '%s' is already connected, ignoring", cn.name)
762 }
763}
764
765func (cn *beConnection) waitAndTryAgain(ctx context.Context) {
766 go func(ctx context.Context) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400767 ctxTm, cnclTm := context.WithTimeout(context.Background(), 10*time.Second)
768 select {
769 case <-ctxTm.Done():
770 cnclTm()
771 log.Debugf("Trying to connect '%s'", cn.name)
772 // Connect creates a new context so cancel this one.
773 cn.cancel()
774 cn.connect()
775 return
776 case <-ctx.Done():
777 cnclTm()
778 return
779 }
sslobodr392ebd52019-01-18 12:41:49 -0500780 }(ctx)
781}
782
783func (cn *beConnection) cancel() {
784 cn.lck.Lock()
785 defer cn.lck.Unlock()
786 log.Debugf("Canceling connection %s", cn.name)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400787 if cn.gConn != nil {
sslobodr392ebd52019-01-18 12:41:49 -0500788 if cn.gConn.cncl != nil {
789 cn.cncl()
790 } else {
791 log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
792 }
793 } else {
794 log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name)
795 }
796}
797
798func (cn *beConnection) setCncl(cncl context.CancelFunc) {
799 cn.lck.Lock()
800 defer cn.lck.Unlock()
801 if cn.gConn != nil {
802 cn.gConn.cncl = cncl
803 } else {
804 log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name)
805 }
806}
807
808func (cn *beConnection) setConn(conn *grpc.ClientConn) {
809 cn.lck.Lock()
810 defer cn.lck.Unlock()
811 if cn.gConn != nil {
812 cn.gConn.conn = conn
813 } else {
814 log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name)
815 }
816}
817
818func (cn *beConnection) getConn() *grpc.ClientConn {
819 cn.lck.Lock()
820 defer cn.lck.Unlock()
821 if cn.gConn != nil {
822 return cn.gConn.conn
823 }
824 return nil
825}
826
827func (cn *beConnection) close() {
828 cn.lck.Lock()
829 defer cn.lck.Unlock()
830 log.Debugf("Closing connection %s", cn.name)
831 if cn.gConn != nil && cn.gConn.conn != nil {
832 if cn.gConn.conn.GetState() == connectivity.Ready {
833 cn.bknd.decConn() // Decrease the connection reference
834 }
835 if cn.gConn.cncl != nil {
836 cn.gConn.cncl() // Cancel the context first to force monitor functions to exit
837 } else {
838 log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
839 }
840 cn.gConn.conn.Close() // Close the connection
841 // Now replace the gConn object with a new one as this one just
842 // fades away as references to it are released after the close
843 // finishes in the background.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400844 cn.gConn = &gConnection{conn: nil, cncl: nil, state: connectivity.TransientFailure}
sslobodr392ebd52019-01-18 12:41:49 -0500845 } else {
846 log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name)
847 }
848
849}
850
851func (cn *beConnection) setState(st connectivity.State) {
852 cn.lck.Lock()
853 defer cn.lck.Unlock()
854 if cn.gConn != nil {
855 cn.gConn.state = st
856 } else {
857 log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name)
858 }
859}
860
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400861func (cn *beConnection) getState() connectivity.State {
sslobodr392ebd52019-01-18 12:41:49 -0500862 cn.lck.Lock()
863 defer cn.lck.Unlock()
864 if cn.gConn != nil {
865 if cn.gConn.conn != nil {
866 return cn.gConn.conn.GetState()
867 } else {
868 log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name)
869 }
870 } else {
871 log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name)
872 }
873 // For lack of a better state to use. The logs will help determine what happened here.
874 return connectivity.TransientFailure
875}
876
sslobodr392ebd52019-01-18 12:41:49 -0500877func (cn *beConnection) monitor(ctx context.Context) {
878 bp := cn.bknd
879 log.Debugf("Setting up monitoring for backend %s", bp.name)
880 go func(ctx context.Context) {
881 var delay time.Duration = 100 //ms
882 for {
883 //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, bp.name, cn.conn)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400884 if cn.getState() == connectivity.Ready {
sslobodr392ebd52019-01-18 12:41:49 -0500885 log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, bp.name)
886 cn.setState(connectivity.Ready)
887 bp.incConn()
888 if cn.getConn() != nil && cn.getConn().WaitForStateChange(ctx, connectivity.Ready) == false {
889 // The context was canceled. This is done by the close function
890 // so just exit the routine
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400891 log.Debugf("Contxt canceled for connection '%s' on backend '%s'", cn.name, bp.name)
sslobodr392ebd52019-01-18 12:41:49 -0500892 return
893 }
894 if cs := cn.getConn(); cs != nil {
895 switch cs := cn.getState(); cs {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400896 case connectivity.TransientFailure:
897 cn.setState(cs)
898 bp.decConn()
899 log.Infof("Transient failure for connection '%s' on backend '%s'", cn.name, bp.name)
900 delay = 100
901 case connectivity.Shutdown:
902 //The connection was closed. The assumption here is that the closer
903 // will manage the connection count and setting the conn to nil.
904 // Exit the routine
905 log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, bp.name)
906 return
907 case connectivity.Idle:
908 // This can only happen if the server sends a GoAway. This can
909 // only happen if the server has modified MaxConnectionIdle from
910 // its default of infinity. The only solution here is to close the
911 // connection and keepTrying()?
912 //TODO: Read the grpc source code to see if there's a different approach
913 log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, bp.name)
914 cn.close()
915 cn.connect()
916 return
sslobodr392ebd52019-01-18 12:41:49 -0500917 }
918 } else { // A nil means something went horribly wrong, error and exit.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400919 log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s", cn.name)
sslobodr392ebd52019-01-18 12:41:49 -0500920 return
921 }
922 } else {
923 log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, bp.name)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400924 ctxTm, cnclTm := context.WithTimeout(context.Background(), delay*time.Millisecond)
sslobodr392ebd52019-01-18 12:41:49 -0500925 if delay < 30000 {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400926 delay += delay
sslobodr392ebd52019-01-18 12:41:49 -0500927 }
928 select {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400929 case <-ctxTm.Done():
930 cnclTm() // Doubt this is required but it's harmless.
931 // Do nothing but let the loop continue
932 case <-ctx.Done():
933 // Context was closed, close and exit routine
934 //cn.close() NO! let the close be managed externally!
935 return
sslobodr392ebd52019-01-18 12:41:49 -0500936 }
937 }
938 }
939 }(ctx)
940}
941
942// Set a callback for connection failure notification
943// This is currently not used.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400944func (bp *backend) setConnFailCallback(cb func(string, *backend) bool) {
sslobodr392ebd52019-01-18 12:41:49 -0500945 bp.connFailCallback = cb
946}