blob: 19133a3dfd415061107d9482df514aaf6d3fda49 [file] [log] [blame]
sslobodr392ebd52019-01-18 12:41:49 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16// gRPC affinity router with active/active backends
17
18package afrouter
19
20// Backend manager handles redundant connections per backend
21
22import (
sslobodr392ebd52019-01-18 12:41:49 -050023 "errors"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040024 "fmt"
25 "github.com/opencord/voltha-go/common/log"
sslobodr392ebd52019-01-18 12:41:49 -050026 "golang.org/x/net/context"
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/codes"
sslobodr392ebd52019-01-18 12:41:49 -050029 "google.golang.org/grpc/connectivity"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040030 "google.golang.org/grpc/metadata"
31 "io"
32 "net"
33 "sort"
34 "strconv"
35 "strings"
36 "sync"
37 "time"
sslobodr392ebd52019-01-18 12:41:49 -050038)
39
sslobodr392ebd52019-01-18 12:41:49 -050040const (
41 BE_ACTIVE_ACTIVE = 1 // Backend type active/active
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040042 BE_SERVER = 2 // Backend type single server
43 BE_SEQ_RR = 0 // Backend sequence round robin
44 AS_NONE = 0 // Association strategy: none
45 AS_SERIAL_NO = 1 // Association strategy: serial number
46 AL_NONE = 0 // Association location: none
47 AL_HEADER = 1 // Association location: header
48 AL_PROTOBUF = 2 // Association location: protobuf
sslobodr392ebd52019-01-18 12:41:49 -050049)
50
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040051var beTypeNames = []string{"", "active_active", "server"}
52var asTypeNames = []string{"", "serial_number"}
53var alTypeNames = []string{"", "header", "protobuf"}
sslobodr392ebd52019-01-18 12:41:49 -050054
55var bClusters map[string]*backendCluster = make(map[string]*backendCluster)
56
57type backendCluster struct {
58 name string
59 //backends map[string]*backend
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040060 backends []*backend
61 beRvMap map[*backend]int
sslobodr392ebd52019-01-18 12:41:49 -050062 serialNoSource chan uint64
63}
64
65type backend struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040066 lck sync.Mutex
67 name string
68 beType int
69 activeAssoc assoc
70 connFailCallback func(string, *backend) bool
71 connections map[string]*beConnection
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040072 opnConns int
sslobodr392ebd52019-01-18 12:41:49 -050073}
74
75type assoc struct {
76 strategy int
77 location int
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040078 field string // Used only if location is protobuf
79 key string
sslobodr392ebd52019-01-18 12:41:49 -050080}
81
82type beConnection struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040083 lck sync.Mutex
84 cncl context.CancelFunc
85 name string
86 addr string
87 port string
sslobodr392ebd52019-01-18 12:41:49 -050088 gConn *gConnection
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040089 bknd *backend
sslobodr392ebd52019-01-18 12:41:49 -050090}
91
92// This structure should never be referred to
93// by any routine outside of *beConnection
94// routines.
95type gConnection struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040096 lck sync.Mutex
sslobodr392ebd52019-01-18 12:41:49 -050097 state connectivity.State
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040098 conn *grpc.ClientConn
99 cncl context.CancelFunc
sslobodr392ebd52019-01-18 12:41:49 -0500100}
101
102type beClStrm struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400103 strm grpc.ClientStream
104 ctxt context.Context
105 cncl context.CancelFunc
sslobodr63d160c2019-02-08 14:25:13 -0500106 ok2Close chan struct{}
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400107 c2sRtrn chan error
108 s2cRtrn error
sslobodr392ebd52019-01-18 12:41:49 -0500109}
110
111type beClStrms struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400112 lck sync.Mutex
113 actvStrm *beClStrm
114 strms map[string]*beClStrm
sslobodr63d160c2019-02-08 14:25:13 -0500115 srtdStrms []*beClStrm
sslobodr392ebd52019-01-18 12:41:49 -0500116}
117
118//***************************************************************//
119//****************** BackendCluster Functions *******************//
120//***************************************************************//
121
122//TODO: Move the backend type (active/active etc) to the cluster
123// level. All backends should really be of the same type.
124// Create a new backend cluster
sslobodrcd37bc52019-01-24 11:47:16 -0500125func newBackendCluster(conf *BackendClusterConfig) (*backendCluster, error) {
sslobodr392ebd52019-01-18 12:41:49 -0500126 var err error = nil
127 var rtrn_err bool = false
128 var be *backend
sslobodr8e2ccb52019-02-05 09:21:47 -0500129 log.Debugf("Creating a backend cluster with %v", conf)
sslobodr392ebd52019-01-18 12:41:49 -0500130 // Validate the configuration
131 if conf.Name == "" {
132 log.Error("A backend cluster must have a name")
133 rtrn_err = true
134 }
135 //bc := &backendCluster{name:conf.Name,backends:make(map[string]*backend)}
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400136 bc := &backendCluster{name: conf.Name, beRvMap: make(map[*backend]int)}
sslobodr392ebd52019-01-18 12:41:49 -0500137 bClusters[bc.name] = bc
138 bc.startSerialNumberSource() // Serial numberere for active/active backends
139 idx := 0
sslobodr5f0b5a32019-01-24 07:45:19 -0500140 for _, bec := range conf.Backends {
sslobodr392ebd52019-01-18 12:41:49 -0500141 if bec.Name == "" {
142 log.Errorf("A backend must have a name in cluster %s\n", conf.Name)
143 rtrn_err = true
144 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400145 if be, err = newBackend(&bec, conf.Name); err != nil {
sslobodr392ebd52019-01-18 12:41:49 -0500146 log.Errorf("Error creating backend %s", bec.Name)
147 rtrn_err = true
148 }
149 bc.backends = append(bc.backends, be)
150 bc.beRvMap[bc.backends[idx]] = idx
151 idx++
152 }
153 if rtrn_err {
154 return nil, errors.New("Error creating backend(s)")
155 }
156 return bc, nil
157}
158
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400159func (bc *backendCluster) getBackend(name string) *backend {
160 for _, v := range bc.backends {
sslobodr392ebd52019-01-18 12:41:49 -0500161 if v.name == name {
162 return v
163 }
164 }
165 return nil
166}
167
168func (bc *backendCluster) startSerialNumberSource() {
169 bc.serialNoSource = make(chan uint64)
170 var counter uint64 = 0
171 // This go routine only dies on exit, it is not a leak
172 go func() {
173 for {
174 bc.serialNoSource <- counter
175 counter++
176 }
177 }()
178}
179
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400180func (bc *backendCluster) nextBackend(be *backend, seq int) (*backend, error) {
sslobodr392ebd52019-01-18 12:41:49 -0500181 switch seq {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400182 case BE_SEQ_RR: // Round robin
183 in := be
184 // If no backend is found having a connection
185 // then return nil.
186 if be == nil {
187 log.Debug("Previous backend is nil")
188 be = bc.backends[0]
189 in = be
190 if be.opnConns != 0 {
191 return be, nil
sslobodr392ebd52019-01-18 12:41:49 -0500192 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400193 }
194 for {
195 log.Debugf("Requesting a new backend starting from %s", be.name)
196 cur := bc.beRvMap[be]
197 cur++
198 if cur >= len(bc.backends) {
199 cur = 0
sslobodr392ebd52019-01-18 12:41:49 -0500200 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400201 log.Debugf("Next backend is %d:%s", cur, bc.backends[cur].name)
202 if bc.backends[cur].opnConns > 0 {
203 return bc.backends[cur], nil
204 }
205 if bc.backends[cur] == in {
206 err := fmt.Errorf("No backend with open connections found")
207 log.Debug(err)
208 return nil, err
209 }
210 be = bc.backends[cur]
211 log.Debugf("Backend '%s' has no open connections, trying next", bc.backends[cur].name)
212 }
213 default: // Invalid, defalt to routnd robin
214 log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq)
215 return bc.nextBackend(be, BE_SEQ_RR)
sslobodr392ebd52019-01-18 12:41:49 -0500216 }
217}
218
219func (bec *backendCluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, mthdSlice []string,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400220 mk string, mv string) error {
221 //func (bec *backendCluster) handler(nbR * nbRequest) error {
sslobodr392ebd52019-01-18 12:41:49 -0500222
223 // The final backend cluster needs to be determined here. With non-affinity routed backends it could
224 // just be determined here and for affinity routed backends the first message must be received
225 // before the backend is determined. In order to keep things simple, the same approach is taken for
226 // now.
227
228 // Get the backend to use.
229 // Allocate the nbFrame here since it holds the "context" of this communication
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400230 nf := &nbFrame{router: r, mthdSlice: mthdSlice, serNo: bec.serialNoSource, metaKey: mk, metaVal: mv}
sslobodr392ebd52019-01-18 12:41:49 -0500231 log.Debugf("Nb frame allocate with method %s", nf.mthdSlice[REQ_METHOD])
232
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400233 if be, err := bec.assignBackend(serverStream, nf); err != nil {
sslobodr392ebd52019-01-18 12:41:49 -0500234 // At this point, no backend streams have been initiated
235 // so just return the error.
236 return err
237 } else {
238 log.Debugf("Backend '%s' selected", be.name)
239 // Allocate a sbFrame here because it might be needed for return value intercept
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400240 sf := &sbFrame{router: r, be: be, method: nf.mthdSlice[REQ_METHOD], metaKey: mk, metaVal: mv}
241 log.Debugf("Sb frame allocated with router %s", r.Name())
sslobodr392ebd52019-01-18 12:41:49 -0500242 return be.handler(srv, serverStream, nf, sf)
243 }
244}
245
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400246func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f *nbFrame) (*beClStrms, error) {
sslobodr392ebd52019-01-18 12:41:49 -0500247
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400248 rtrn := &beClStrms{strms: make(map[string]*beClStrm), actvStrm: nil}
sslobodr392ebd52019-01-18 12:41:49 -0500249
250 log.Debugf("Opening southbound streams for method '%s'", f.mthdSlice[REQ_METHOD])
251 // Get the metadata from the incoming message on the server
252 md, ok := metadata.FromIncomingContext(serverStream.Context())
253 if !ok {
254 return nil, errors.New("Could not get a server stream metadata")
255 }
256
257 // TODO: Need to check if this is an active/active backend cluster
258 // with a serial number in the header.
259 serialNo := <-f.serNo
260 log.Debugf("Serial number for transaction allocated: %d", serialNo)
261 // If even one stream can be created then proceed. If none can be
262 // created then report an error becase both the primary and redundant
263 // connections are non-existant.
264 var atLeastOne bool = false
265 var errStr strings.Builder
266 log.Debugf("There are %d connections to open", len(be.connections))
Kent Hagerman1b9c7062019-05-07 16:46:01 -0400267 for _, cn := range be.connections {
sslobodr392ebd52019-01-18 12:41:49 -0500268 // Copy in the metadata
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400269 if cn.getState() == connectivity.Ready && cn.getConn() != nil {
sslobodr63d160c2019-02-08 14:25:13 -0500270 log.Debugf("Opening southbound stream for connection '%s'", cn.name)
sslobodr392ebd52019-01-18 12:41:49 -0500271 // Create an outgoing context that includes the incoming metadata
272 // and that will cancel if the server's context is canceled
273 clientCtx, clientCancel := context.WithCancel(serverStream.Context())
274 clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy())
275 //TODO: Same check here, only add the serial number if necessary
276 clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400277 strconv.FormatUint(serialNo, 10))
sslobodr392ebd52019-01-18 12:41:49 -0500278 // Create the client stream
279 if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400280 cn.getConn(), f.mthdSlice[REQ_ALL]); err != nil {
281 log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
sslobodr392ebd52019-01-18 12:41:49 -0500282 fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
sslobodr63d160c2019-02-08 14:25:13 -0500283 rtrn.strms[cn.name] = nil
sslobodr392ebd52019-01-18 12:41:49 -0500284 } else {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400285 rtrn.strms[cn.name] = &beClStrm{strm: clientStream, ctxt: clientCtx,
286 cncl: clientCancel, s2cRtrn: nil,
287 ok2Close: make(chan struct{}),
288 c2sRtrn: make(chan error, 1)}
sslobodr392ebd52019-01-18 12:41:49 -0500289 atLeastOne = true
290 }
291 } else if cn.getConn() == nil {
292 err := errors.New(fmt.Sprintf("Connection '%s' is closed", cn.name))
293 fmt.Fprint(&errStr, err.Error())
294 log.Debug(err)
295 } else {
296 err := errors.New(fmt.Sprintf("Connection '%s' isn't ready", cn.name))
297 fmt.Fprint(&errStr, err.Error())
298 log.Debug(err)
299 }
300 }
301 if atLeastOne == true {
sslobodr63d160c2019-02-08 14:25:13 -0500302 rtrn.sortStreams()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400303 return rtrn, nil
sslobodr392ebd52019-01-18 12:41:49 -0500304 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400305 fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ", be.name)
sslobodr392ebd52019-01-18 12:41:49 -0500306 log.Error(errStr.String())
307 return nil, errors.New(errStr.String())
308}
309
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400310func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *nbFrame, sf *sbFrame) error {
sslobodr392ebd52019-01-18 12:41:49 -0500311
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400312 // Set up and launch each individual southbound stream
sslobodr392ebd52019-01-18 12:41:49 -0500313 var beStrms *beClStrms
sslobodr63d160c2019-02-08 14:25:13 -0500314 var rtrn error = nil
315 var s2cOk bool = false
316 var c2sOk bool = false
sslobodr392ebd52019-01-18 12:41:49 -0500317
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400318 beStrms, err := be.openSouthboundStreams(srv, serverStream, nf)
sslobodr392ebd52019-01-18 12:41:49 -0500319 if err != nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400320 log.Errorf("openStreams failed: %v", err)
sslobodr392ebd52019-01-18 12:41:49 -0500321 return err
322 }
323 // If we get here, there has to be AT LEAST ONE open stream
324
325 // *Do not explicitly close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
326 // Channels do not have to be closed, it is just a control flow mechanism, see
327 // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
328
329 log.Debug("Starting server to client forwarding")
330 s2cErrChan := beStrms.forwardServerToClient(serverStream, nf)
331
332 log.Debug("Starting client to server forwarding")
333 c2sErrChan := beStrms.forwardClientToServer(serverStream, sf)
334
335 // We don't know which side is going to stop sending first, so we need a select between the two.
336 for i := 0; i < 2; i++ {
337 select {
338 case s2cErr := <-s2cErrChan:
sslobodr63d160c2019-02-08 14:25:13 -0500339 s2cOk = true
sslobodr392ebd52019-01-18 12:41:49 -0500340 log.Debug("Processing s2cErr")
341 if s2cErr == io.EOF {
342 log.Debug("s2cErr reporting EOF")
343 // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./
344 // the clientStream>serverStream may continue sending though.
sslobodr63d160c2019-02-08 14:25:13 -0500345 defer beStrms.closeSend()
346 if c2sOk == true {
347 return rtrn
348 }
sslobodr392ebd52019-01-18 12:41:49 -0500349 } else {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400350 log.Debugf("s2cErr reporting %v", s2cErr)
sslobodr392ebd52019-01-18 12:41:49 -0500351 // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
352 // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
353 // exit with an error to the stack
354 beStrms.clientCancel()
355 return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
356 }
357 case c2sErr := <-c2sErrChan:
sslobodr63d160c2019-02-08 14:25:13 -0500358 c2sOk = true
sslobodr392ebd52019-01-18 12:41:49 -0500359 log.Debug("Processing c2sErr")
360 // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
361 // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
362 // will be nil.
363 serverStream.SetTrailer(beStrms.trailer())
364 // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
sslobodr63d160c2019-02-08 14:25:13 -0500365 // NOTE!!! with redundant backends, it's likely that one of the backends
366 // returns a response before all the data has been sent southbound and
367 // the southbound streams are closed. Should this happen one of the
368 // backends may not get the request.
sslobodr392ebd52019-01-18 12:41:49 -0500369 if c2sErr != io.EOF {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400370 rtrn = c2sErr
sslobodr392ebd52019-01-18 12:41:49 -0500371 }
sslobodr63d160c2019-02-08 14:25:13 -0500372 log.Debug("c2sErr reporting EOF")
373 if s2cOk == true {
374 return rtrn
375 }
sslobodr392ebd52019-01-18 12:41:49 -0500376 }
377 }
378 return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
379}
380
381func (strms *beClStrms) clientCancel() {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400382 for _, strm := range strms.strms {
sslobodr392ebd52019-01-18 12:41:49 -0500383 if strm != nil {
384 strm.cncl()
385 }
386 }
387}
388
389func (strms *beClStrms) closeSend() {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400390 for _, strm := range strms.strms {
sslobodr392ebd52019-01-18 12:41:49 -0500391 if strm != nil {
sslobodr63d160c2019-02-08 14:25:13 -0500392 <-strm.ok2Close
sslobodr392ebd52019-01-18 12:41:49 -0500393 log.Debug("Closing southbound stream")
394 strm.strm.CloseSend()
395 }
396 }
397}
398
399func (strms *beClStrms) trailer() metadata.MD {
400 return strms.actvStrm.strm.Trailer()
401}
402
403func (bec *backendCluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) {
404 // Receive the first message from the server. This calls the assigned codec in which
405 // Unmarshal gets executed. That will use the assigned router to select a backend
406 // and add it to the frame
407 if err := src.RecvMsg(f); err != nil {
408 return nil, err
409 }
410 // Check that the backend was routable and actually has connections open.
411 // If it doesn't then return a nil backend to indicate this
412 if f.be == nil {
413 err := fmt.Errorf("Unable to route method '%s'", f.mthdSlice[REQ_METHOD])
414 log.Error(err)
415 return nil, err
416 } else if f.be.opnConns == 0 {
417 err := fmt.Errorf("No open connections on backend '%s'", f.be.name)
418 log.Error(err)
419 return f.be, err
420 }
421 return f.be, nil
422}
423
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400424func (strms *beClStrms) getActive() *beClStrm {
sslobodr392ebd52019-01-18 12:41:49 -0500425 strms.lck.Lock()
426 defer strms.lck.Unlock()
427 return strms.actvStrm
428}
429
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400430func (strms *beClStrms) setThenGetActive(strm *beClStrm) *beClStrm {
sslobodr392ebd52019-01-18 12:41:49 -0500431 strms.lck.Lock()
432 defer strms.lck.Unlock()
433 if strms.actvStrm == nil {
434 strms.actvStrm = strm
435 }
436 return strms.actvStrm
437}
438
439func (src *beClStrms) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error {
440 fc2s := func(srcS *beClStrm) {
441 for i := 0; ; i++ {
442 if err := srcS.strm.RecvMsg(f); err != nil {
443 if src.setThenGetActive(srcS) == srcS {
444 srcS.c2sRtrn <- err // this can be io.EOF which is the success case
445 } else {
446 srcS.c2sRtrn <- nil // Inactive responder
447 }
sslobodr63d160c2019-02-08 14:25:13 -0500448 close(srcS.ok2Close)
sslobodr392ebd52019-01-18 12:41:49 -0500449 break
450 }
451 if src.setThenGetActive(srcS) != srcS {
452 srcS.c2sRtrn <- nil
sslobodr63d160c2019-02-08 14:25:13 -0500453 continue
sslobodr392ebd52019-01-18 12:41:49 -0500454 }
455 if i == 0 {
456 // This is a bit of a hack, but client to server headers are only readable after first client msg is
457 // received but must be written to server stream before the first msg is flushed.
458 // This is the only place to do it nicely.
459 md, err := srcS.strm.Header()
460 if err != nil {
461 srcS.c2sRtrn <- err
462 break
463 }
464 // Update the metadata for the response.
465 if f.metaKey != NoMeta {
466 if f.metaVal == "" {
467 // We could also alsways just do this
468 md.Set(f.metaKey, f.be.name)
469 } else {
470 md.Set(f.metaKey, f.metaVal)
471 }
472 }
473 if err := dst.SendHeader(md); err != nil {
474 srcS.c2sRtrn <- err
475 break
476 }
477 }
478 log.Debugf("Northbound frame %v", f.payload)
479 if err := dst.SendMsg(f); err != nil {
480 srcS.c2sRtrn <- err
481 break
482 }
483 }
484 }
485
486 // There should be AT LEAST one open stream at this point
487 // if there isn't its a grave error in the code and it will
488 // cause this thread to block here so check for it and
489 // don't let the lock up happen but report the error
490 ret := make(chan error, 1)
491 agg := make(chan *beClStrm)
492 atLeastOne := false
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400493 for _, strm := range src.strms {
sslobodr392ebd52019-01-18 12:41:49 -0500494 if strm != nil {
495 go fc2s(strm)
496 go func(s *beClStrm) { // Wait on result and aggregate
497 r := <-s.c2sRtrn // got the return code
498 if r == nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400499 return // We're the redundat stream, just die
sslobodr392ebd52019-01-18 12:41:49 -0500500 }
501 s.c2sRtrn <- r // put it back to pass it along
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400502 agg <- s // send the stream to the aggregator
503 }(strm)
sslobodr392ebd52019-01-18 12:41:49 -0500504 atLeastOne = true
505 }
506 }
507 if atLeastOne == true {
508 go func() { // Wait on aggregated result
509 s := <-agg
510 ret <- <-s.c2sRtrn
511 }()
512 } else {
513 err := errors.New("There are no open streams. Unable to forward message.")
514 log.Error(err)
515 ret <- err
516 }
517 return ret
518}
519
520func (strms *beClStrms) sendAll(f *nbFrame) error {
521 var rtrn error
522
523 atLeastOne := false
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400524 for _, strm := range strms.srtdStrms {
sslobodr392ebd52019-01-18 12:41:49 -0500525 if strm != nil {
526 if err := strm.strm.SendMsg(f); err != nil {
sslobodr63d160c2019-02-08 14:25:13 -0500527 log.Debugf("Error on SendMsg: %s", err.Error())
sslobodr392ebd52019-01-18 12:41:49 -0500528 strm.s2cRtrn = err
529 }
530 atLeastOne = true
sslobodr63d160c2019-02-08 14:25:13 -0500531 } else {
532 log.Debugf("Nil stream")
sslobodr392ebd52019-01-18 12:41:49 -0500533 }
534 }
535 // If one of the streams succeeded, declare success
536 // if none did pick an error and return it.
537 if atLeastOne == true {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400538 for _, strm := range strms.srtdStrms {
sslobodr392ebd52019-01-18 12:41:49 -0500539 if strm != nil {
540 rtrn = strm.s2cRtrn
541 if rtrn == nil {
542 return rtrn
543 }
544 }
545 }
546 return rtrn
547 } else {
548 rtrn = errors.New("There are no open streams, this should never happen")
549 log.Error(rtrn)
550 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400551 return rtrn
sslobodr392ebd52019-01-18 12:41:49 -0500552}
553
554func (dst *beClStrms) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
555 ret := make(chan error, 1)
556 go func() {
557 // The frame buffer already has the results of a first
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400558 // RecvMsg in it so the first thing to do is to
sslobodr392ebd52019-01-18 12:41:49 -0500559 // send it to the list of client streams and only
560 // then read some more.
561 for i := 0; ; i++ {
562 // Send the message to each of the backend streams
563 if err := dst.sendAll(f); err != nil {
564 ret <- err
sslobodr63d160c2019-02-08 14:25:13 -0500565 log.Debugf("SendAll failed %s", err.Error())
sslobodr392ebd52019-01-18 12:41:49 -0500566 break
567 }
568 log.Debugf("Southbound frame %v", f.payload)
569 if err := src.RecvMsg(f); err != nil {
570 ret <- err // this can be io.EOF which is happy case
571 break
572 }
573 }
574 }()
575 return ret
576}
577
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400578func (st *beClStrms) sortStreams() {
sslobodr63d160c2019-02-08 14:25:13 -0500579 var tmpKeys []string
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400580 for k, _ := range st.strms {
sslobodr63d160c2019-02-08 14:25:13 -0500581 tmpKeys = append(tmpKeys, k)
582 }
583 sort.Strings(tmpKeys)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400584 for _, v := range tmpKeys {
sslobodr63d160c2019-02-08 14:25:13 -0500585 st.srtdStrms = append(st.srtdStrms, st.strms[v])
586 }
587}
588
sslobodr392ebd52019-01-18 12:41:49 -0500589func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
590 var rtrn_err bool = false
591
592 log.Debugf("Configuring the backend with %v", *conf)
593 // Validate the conifg and configure the backend
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400594 be := &backend{name: conf.Name, connections: make(map[string]*beConnection), opnConns: 0}
595 idx := strIndex([]string(beTypeNames), conf.Type)
sslobodr392ebd52019-01-18 12:41:49 -0500596 if idx == 0 {
597 log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
598 rtrn_err = true
599 }
600 be.beType = idx
601
602 idx = strIndex(asTypeNames, conf.Association.Strategy)
603 if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
604 log.Errorf("An association strategy must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400605 "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500606 rtrn_err = true
607 }
608 be.activeAssoc.strategy = idx
609
610 idx = strIndex(alTypeNames, conf.Association.Location)
611 if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
612 log.Errorf("An association location must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400613 "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500614 rtrn_err = true
615 }
616 be.activeAssoc.location = idx
617
618 if conf.Association.Field == "" && be.activeAssoc.location == AL_PROTOBUF {
619 log.Errorf("An association field must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400620 "type is active/active and the location is set to protobuf "+
621 "for backend %s in cluster %s", conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500622 rtrn_err = true
623 }
624 be.activeAssoc.field = conf.Association.Field
sslobodr8e2ccb52019-02-05 09:21:47 -0500625
626 if conf.Association.Key == "" && be.activeAssoc.location == AL_HEADER {
627 log.Errorf("An association key must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400628 "type is active/active and the location is set to header "+
629 "for backend %s in cluster %s", conf.Name, clusterName)
sslobodr8e2ccb52019-02-05 09:21:47 -0500630 rtrn_err = true
631 }
632 be.activeAssoc.key = conf.Association.Key
sslobodr392ebd52019-01-18 12:41:49 -0500633 if rtrn_err {
634 return nil, errors.New("Backend configuration failed")
635 }
636 // Configure the connections
637 // Connections can consist of just a name. This allows for dynamic configuration
638 // at a later time.
639 // TODO: validate that there is one connection for all but active/active backends
sslobodr8e2ccb52019-02-05 09:21:47 -0500640 if len(conf.Connections) > 1 && be.activeAssoc.strategy != BE_ACTIVE_ACTIVE {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400641 log.Errorf("Only one connection must be specified if the association " +
642 "strategy is not set to 'active_active'")
sslobodr8e2ccb52019-02-05 09:21:47 -0500643 rtrn_err = true
644 }
645 if len(conf.Connections) == 0 {
646 log.Errorf("At least one connection must be specified")
647 rtrn_err = true
648 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400649 for _, cnConf := range conf.Connections {
sslobodr392ebd52019-01-18 12:41:49 -0500650 if cnConf.Name == "" {
651 log.Errorf("A connection must have a name for backend %s in cluster %s",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400652 conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500653 } else {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400654 gc := &gConnection{conn: nil, cncl: nil, state: connectivity.Idle}
655 be.connections[cnConf.Name] = &beConnection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, bknd: be, gConn: gc}
sslobodr392ebd52019-01-18 12:41:49 -0500656 if cnConf.Addr != "" { // This connection will be specified later.
657 if ip := net.ParseIP(cnConf.Addr); ip == nil {
658 log.Errorf("The IP address for connection %s in backend %s in cluster %s is invalid",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400659 cnConf.Name, conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500660 rtrn_err = true
661 }
662 // Validate the port number. This just validtes that it's a non 0 integer
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400663 if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
sslobodr392ebd52019-01-18 12:41:49 -0500664 log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
665 cnConf.Port, cnConf.Name, conf.Name, clusterName)
666 rtrn_err = true
667 } else {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400668 if n <= 0 && n > 65535 {
sslobodr392ebd52019-01-18 12:41:49 -0500669 log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
670 cnConf.Port, cnConf.Name, conf.Name, clusterName)
671 rtrn_err = true
672 }
673 }
674 }
675 }
676 }
sslobodr63d160c2019-02-08 14:25:13 -0500677
sslobodr392ebd52019-01-18 12:41:49 -0500678 if rtrn_err {
679 return nil, errors.New("Connection configuration failed")
680 }
sslobodr392ebd52019-01-18 12:41:49 -0500681 // All is well start the backend cluster connections
682 be.connectAll()
683
684 return be, nil
685}
686
687//***************************************************************//
688//********************* Backend Functions ***********************//
689//***************************************************************//
690
691func (be *backend) incConn() {
692 be.lck.Lock()
693 defer be.lck.Unlock()
694 be.opnConns++
695}
696
697func (be *backend) decConn() {
698 be.lck.Lock()
699 defer be.lck.Unlock()
700 be.opnConns--
701 if be.opnConns < 0 {
702 log.Error("Internal error, number of open connections less than 0")
703 be.opnConns = 0
704 }
705}
706
707// Attempts to establish all the connections for a backend
708// any failures result in an abort. This should only be called
709// on a first attempt to connect. Individual connections should be
710// handled after that.
711func (be *backend) connectAll() {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400712 for _, cn := range be.connections {
sslobodr392ebd52019-01-18 12:41:49 -0500713 cn.connect()
714 }
715}
716
717func (cn *beConnection) connect() {
718 if cn.addr != "" && cn.getConn() == nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400719 log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name, cn.addr, cn.port)
sslobodr392ebd52019-01-18 12:41:49 -0500720 // Dial doesn't block, it just returns and continues connecting in the background.
721 // Check back later to confirm and increase the connection count.
722 ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection
723 cn.setCncl(cnclFnc)
724 if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400725 log.Errorf("Dialng connection %v:%v", cn, err)
sslobodr392ebd52019-01-18 12:41:49 -0500726 cn.waitAndTryAgain(ctx)
727 } else {
728 cn.setConn(conn)
729 log.Debugf("Starting the connection monitor for '%s'", cn.name)
730 cn.monitor(ctx)
731 }
732 } else if cn.addr == "" {
733 log.Infof("No address supplied for connection '%s', not connecting for now", cn.name)
734 } else {
735 log.Debugf("Connection '%s' is already connected, ignoring", cn.name)
736 }
737}
738
739func (cn *beConnection) waitAndTryAgain(ctx context.Context) {
740 go func(ctx context.Context) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400741 ctxTm, cnclTm := context.WithTimeout(context.Background(), 10*time.Second)
742 select {
743 case <-ctxTm.Done():
744 cnclTm()
745 log.Debugf("Trying to connect '%s'", cn.name)
746 // Connect creates a new context so cancel this one.
747 cn.cancel()
748 cn.connect()
749 return
750 case <-ctx.Done():
751 cnclTm()
752 return
753 }
sslobodr392ebd52019-01-18 12:41:49 -0500754 }(ctx)
755}
756
757func (cn *beConnection) cancel() {
758 cn.lck.Lock()
759 defer cn.lck.Unlock()
760 log.Debugf("Canceling connection %s", cn.name)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400761 if cn.gConn != nil {
sslobodr392ebd52019-01-18 12:41:49 -0500762 if cn.gConn.cncl != nil {
763 cn.cncl()
764 } else {
765 log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
766 }
767 } else {
768 log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name)
769 }
770}
771
772func (cn *beConnection) setCncl(cncl context.CancelFunc) {
773 cn.lck.Lock()
774 defer cn.lck.Unlock()
775 if cn.gConn != nil {
776 cn.gConn.cncl = cncl
777 } else {
778 log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name)
779 }
780}
781
782func (cn *beConnection) setConn(conn *grpc.ClientConn) {
783 cn.lck.Lock()
784 defer cn.lck.Unlock()
785 if cn.gConn != nil {
786 cn.gConn.conn = conn
787 } else {
788 log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name)
789 }
790}
791
792func (cn *beConnection) getConn() *grpc.ClientConn {
793 cn.lck.Lock()
794 defer cn.lck.Unlock()
795 if cn.gConn != nil {
796 return cn.gConn.conn
797 }
798 return nil
799}
800
801func (cn *beConnection) close() {
802 cn.lck.Lock()
803 defer cn.lck.Unlock()
804 log.Debugf("Closing connection %s", cn.name)
805 if cn.gConn != nil && cn.gConn.conn != nil {
806 if cn.gConn.conn.GetState() == connectivity.Ready {
807 cn.bknd.decConn() // Decrease the connection reference
808 }
809 if cn.gConn.cncl != nil {
810 cn.gConn.cncl() // Cancel the context first to force monitor functions to exit
811 } else {
812 log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
813 }
814 cn.gConn.conn.Close() // Close the connection
815 // Now replace the gConn object with a new one as this one just
816 // fades away as references to it are released after the close
817 // finishes in the background.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400818 cn.gConn = &gConnection{conn: nil, cncl: nil, state: connectivity.TransientFailure}
sslobodr392ebd52019-01-18 12:41:49 -0500819 } else {
820 log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name)
821 }
822
823}
824
825func (cn *beConnection) setState(st connectivity.State) {
826 cn.lck.Lock()
827 defer cn.lck.Unlock()
828 if cn.gConn != nil {
829 cn.gConn.state = st
830 } else {
831 log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name)
832 }
833}
834
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400835func (cn *beConnection) getState() connectivity.State {
sslobodr392ebd52019-01-18 12:41:49 -0500836 cn.lck.Lock()
837 defer cn.lck.Unlock()
838 if cn.gConn != nil {
839 if cn.gConn.conn != nil {
840 return cn.gConn.conn.GetState()
841 } else {
842 log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name)
843 }
844 } else {
845 log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name)
846 }
847 // For lack of a better state to use. The logs will help determine what happened here.
848 return connectivity.TransientFailure
849}
850
sslobodr392ebd52019-01-18 12:41:49 -0500851func (cn *beConnection) monitor(ctx context.Context) {
852 bp := cn.bknd
853 log.Debugf("Setting up monitoring for backend %s", bp.name)
854 go func(ctx context.Context) {
855 var delay time.Duration = 100 //ms
856 for {
857 //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, bp.name, cn.conn)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400858 if cn.getState() == connectivity.Ready {
sslobodr392ebd52019-01-18 12:41:49 -0500859 log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, bp.name)
860 cn.setState(connectivity.Ready)
861 bp.incConn()
862 if cn.getConn() != nil && cn.getConn().WaitForStateChange(ctx, connectivity.Ready) == false {
863 // The context was canceled. This is done by the close function
864 // so just exit the routine
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400865 log.Debugf("Contxt canceled for connection '%s' on backend '%s'", cn.name, bp.name)
sslobodr392ebd52019-01-18 12:41:49 -0500866 return
867 }
868 if cs := cn.getConn(); cs != nil {
869 switch cs := cn.getState(); cs {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400870 case connectivity.TransientFailure:
871 cn.setState(cs)
872 bp.decConn()
873 log.Infof("Transient failure for connection '%s' on backend '%s'", cn.name, bp.name)
874 delay = 100
875 case connectivity.Shutdown:
876 //The connection was closed. The assumption here is that the closer
877 // will manage the connection count and setting the conn to nil.
878 // Exit the routine
879 log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, bp.name)
880 return
881 case connectivity.Idle:
882 // This can only happen if the server sends a GoAway. This can
883 // only happen if the server has modified MaxConnectionIdle from
884 // its default of infinity. The only solution here is to close the
885 // connection and keepTrying()?
886 //TODO: Read the grpc source code to see if there's a different approach
887 log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, bp.name)
888 cn.close()
889 cn.connect()
890 return
sslobodr392ebd52019-01-18 12:41:49 -0500891 }
892 } else { // A nil means something went horribly wrong, error and exit.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400893 log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s", cn.name)
sslobodr392ebd52019-01-18 12:41:49 -0500894 return
895 }
896 } else {
897 log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, bp.name)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400898 ctxTm, cnclTm := context.WithTimeout(context.Background(), delay*time.Millisecond)
sslobodr392ebd52019-01-18 12:41:49 -0500899 if delay < 30000 {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400900 delay += delay
sslobodr392ebd52019-01-18 12:41:49 -0500901 }
902 select {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400903 case <-ctxTm.Done():
904 cnclTm() // Doubt this is required but it's harmless.
905 // Do nothing but let the loop continue
906 case <-ctx.Done():
907 // Context was closed, close and exit routine
908 //cn.close() NO! let the close be managed externally!
909 return
sslobodr392ebd52019-01-18 12:41:49 -0500910 }
911 }
912 }
913 }(ctx)
914}
915
916// Set a callback for connection failure notification
917// This is currently not used.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400918func (bp *backend) setConnFailCallback(cb func(string, *backend) bool) {
sslobodr392ebd52019-01-18 12:41:49 -0500919 bp.connFailCallback = cb
920}