blob: f616f6b7f55f691b221504975180add6776a9d6c [file] [log] [blame]
sslobodr392ebd52019-01-18 12:41:49 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
sslobodr392ebd52019-01-18 12:41:49 -050016
17package afrouter
18
19// Backend manager handles redundant connections per backend
20
21import (
sslobodr392ebd52019-01-18 12:41:49 -050022 "errors"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040023 "fmt"
24 "github.com/opencord/voltha-go/common/log"
sslobodr392ebd52019-01-18 12:41:49 -050025 "golang.org/x/net/context"
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/codes"
sslobodr392ebd52019-01-18 12:41:49 -050028 "google.golang.org/grpc/connectivity"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040029 "google.golang.org/grpc/metadata"
30 "io"
31 "net"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040032 "strconv"
33 "strings"
34 "sync"
sslobodr392ebd52019-01-18 12:41:49 -050035)
36
Kent Hagerman1e9061e2019-05-21 16:01:21 -040037// backend represents a collection of backends in a HA configuration
sslobodr392ebd52019-01-18 12:41:49 -050038type backend struct {
Kent Hagerman1e9061e2019-05-21 16:01:21 -040039 mutex sync.Mutex
40 name string
41 beType backendType
42 activeAssociation association
43 connFailCallback func(string, *backend) bool
44 connections map[string]*connection
45 openConns int
sslobodr392ebd52019-01-18 12:41:49 -050046}
47
Kent Hagerman1e9061e2019-05-21 16:01:21 -040048type association struct {
49 strategy associationStrategy
50 location associationLocation
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040051 field string // Used only if location is protobuf
52 key string
sslobodr392ebd52019-01-18 12:41:49 -050053}
54
Kent Hagerman1e9061e2019-05-21 16:01:21 -040055func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f *nbFrame) (*streams, error) {
sslobodr392ebd52019-01-18 12:41:49 -050056
Kent Hagerman1e9061e2019-05-21 16:01:21 -040057 rtrn := &streams{streams: make(map[string]*stream), activeStream: nil}
sslobodr392ebd52019-01-18 12:41:49 -050058
Kent Hagerman1e9061e2019-05-21 16:01:21 -040059 log.Debugf("Opening southbound streams for method '%s'", f.methodInfo.method)
sslobodr392ebd52019-01-18 12:41:49 -050060 // Get the metadata from the incoming message on the server
61 md, ok := metadata.FromIncomingContext(serverStream.Context())
62 if !ok {
63 return nil, errors.New("Could not get a server stream metadata")
64 }
65
66 // TODO: Need to check if this is an active/active backend cluster
67 // with a serial number in the header.
Kent Hagerman1e9061e2019-05-21 16:01:21 -040068 log.Debugf("Serial number for transaction allocated: %d", f.serialNo)
sslobodr392ebd52019-01-18 12:41:49 -050069 // If even one stream can be created then proceed. If none can be
70 // created then report an error becase both the primary and redundant
71 // connections are non-existant.
Kent Hagerman1e9061e2019-05-21 16:01:21 -040072 var atLeastOne = false
sslobodr392ebd52019-01-18 12:41:49 -050073 var errStr strings.Builder
74 log.Debugf("There are %d connections to open", len(be.connections))
Kent Hagerman1b9c7062019-05-07 16:46:01 -040075 for _, cn := range be.connections {
sslobodr392ebd52019-01-18 12:41:49 -050076 // Copy in the metadata
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040077 if cn.getState() == connectivity.Ready && cn.getConn() != nil {
sslobodr63d160c2019-02-08 14:25:13 -050078 log.Debugf("Opening southbound stream for connection '%s'", cn.name)
sslobodr392ebd52019-01-18 12:41:49 -050079 // Create an outgoing context that includes the incoming metadata
80 // and that will cancel if the server's context is canceled
81 clientCtx, clientCancel := context.WithCancel(serverStream.Context())
82 clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy())
83 //TODO: Same check here, only add the serial number if necessary
84 clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number",
Kent Hagerman1e9061e2019-05-21 16:01:21 -040085 strconv.FormatUint(f.serialNo, 10))
sslobodr392ebd52019-01-18 12:41:49 -050086 // Create the client stream
87 if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
Kent Hagerman1e9061e2019-05-21 16:01:21 -040088 cn.getConn(), f.methodInfo.all); err != nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040089 log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
sslobodr392ebd52019-01-18 12:41:49 -050090 fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
Kent Hagerman1e9061e2019-05-21 16:01:21 -040091 rtrn.streams[cn.name] = nil
sslobodr392ebd52019-01-18 12:41:49 -050092 } else {
Kent Hagerman1e9061e2019-05-21 16:01:21 -040093 rtrn.streams[cn.name] = &stream{stream: clientStream, ctx: clientCtx,
94 cancel: clientCancel, s2cReturn: nil,
95 ok2Close: make(chan struct{}),
96 c2sReturn: make(chan error, 1)}
sslobodr392ebd52019-01-18 12:41:49 -050097 atLeastOne = true
98 }
99 } else if cn.getConn() == nil {
100 err := errors.New(fmt.Sprintf("Connection '%s' is closed", cn.name))
101 fmt.Fprint(&errStr, err.Error())
102 log.Debug(err)
103 } else {
104 err := errors.New(fmt.Sprintf("Connection '%s' isn't ready", cn.name))
105 fmt.Fprint(&errStr, err.Error())
106 log.Debug(err)
107 }
108 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400109 if atLeastOne {
sslobodr63d160c2019-02-08 14:25:13 -0500110 rtrn.sortStreams()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400111 return rtrn, nil
sslobodr392ebd52019-01-18 12:41:49 -0500112 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400113 fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ", be.name)
sslobodr392ebd52019-01-18 12:41:49 -0500114 log.Error(errStr.String())
115 return nil, errors.New(errStr.String())
116}
117
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400118func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *nbFrame, sf *sbFrame) error {
sslobodr392ebd52019-01-18 12:41:49 -0500119
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400120 // Set up and launch each individual southbound stream
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400121 var beStrms *streams
sslobodr63d160c2019-02-08 14:25:13 -0500122 var rtrn error = nil
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400123 var s2cOk = false
124 var c2sOk = false
sslobodr392ebd52019-01-18 12:41:49 -0500125
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400126 beStrms, err := be.openSouthboundStreams(srv, serverStream, nf)
sslobodr392ebd52019-01-18 12:41:49 -0500127 if err != nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400128 log.Errorf("openStreams failed: %v", err)
sslobodr392ebd52019-01-18 12:41:49 -0500129 return err
130 }
131 // If we get here, there has to be AT LEAST ONE open stream
132
133 // *Do not explicitly close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
134 // Channels do not have to be closed, it is just a control flow mechanism, see
135 // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
136
137 log.Debug("Starting server to client forwarding")
138 s2cErrChan := beStrms.forwardServerToClient(serverStream, nf)
139
140 log.Debug("Starting client to server forwarding")
141 c2sErrChan := beStrms.forwardClientToServer(serverStream, sf)
142
143 // We don't know which side is going to stop sending first, so we need a select between the two.
144 for i := 0; i < 2; i++ {
145 select {
146 case s2cErr := <-s2cErrChan:
sslobodr63d160c2019-02-08 14:25:13 -0500147 s2cOk = true
sslobodr392ebd52019-01-18 12:41:49 -0500148 log.Debug("Processing s2cErr")
149 if s2cErr == io.EOF {
150 log.Debug("s2cErr reporting EOF")
151 // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./
152 // the clientStream>serverStream may continue sending though.
sslobodr63d160c2019-02-08 14:25:13 -0500153 defer beStrms.closeSend()
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400154 if c2sOk {
sslobodr63d160c2019-02-08 14:25:13 -0500155 return rtrn
156 }
sslobodr392ebd52019-01-18 12:41:49 -0500157 } else {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400158 log.Debugf("s2cErr reporting %v", s2cErr)
sslobodr392ebd52019-01-18 12:41:49 -0500159 // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
160 // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
161 // exit with an error to the stack
162 beStrms.clientCancel()
163 return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
164 }
165 case c2sErr := <-c2sErrChan:
sslobodr63d160c2019-02-08 14:25:13 -0500166 c2sOk = true
sslobodr392ebd52019-01-18 12:41:49 -0500167 log.Debug("Processing c2sErr")
168 // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
169 // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
170 // will be nil.
171 serverStream.SetTrailer(beStrms.trailer())
172 // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
sslobodr63d160c2019-02-08 14:25:13 -0500173 // NOTE!!! with redundant backends, it's likely that one of the backends
174 // returns a response before all the data has been sent southbound and
175 // the southbound streams are closed. Should this happen one of the
176 // backends may not get the request.
sslobodr392ebd52019-01-18 12:41:49 -0500177 if c2sErr != io.EOF {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400178 rtrn = c2sErr
sslobodr392ebd52019-01-18 12:41:49 -0500179 }
sslobodr63d160c2019-02-08 14:25:13 -0500180 log.Debug("c2sErr reporting EOF")
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400181 if s2cOk {
sslobodr63d160c2019-02-08 14:25:13 -0500182 return rtrn
183 }
sslobodr392ebd52019-01-18 12:41:49 -0500184 }
185 }
186 return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
187}
188
sslobodr392ebd52019-01-18 12:41:49 -0500189func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
190 var rtrn_err bool = false
191
192 log.Debugf("Configuring the backend with %v", *conf)
193 // Validate the conifg and configure the backend
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400194 be := &backend{name: conf.Name, connections: make(map[string]*connection), openConns: 0}
195 if conf.Type == BackendUndefined {
sslobodr392ebd52019-01-18 12:41:49 -0500196 log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
197 rtrn_err = true
198 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400199 be.beType = conf.Type
sslobodr392ebd52019-01-18 12:41:49 -0500200
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400201 if conf.Association.Strategy == AssociationStrategyUndefined && be.beType == BackendActiveActive {
sslobodr392ebd52019-01-18 12:41:49 -0500202 log.Errorf("An association strategy must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400203 "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500204 rtrn_err = true
205 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400206 be.activeAssociation.strategy = conf.Association.Strategy
sslobodr392ebd52019-01-18 12:41:49 -0500207
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400208 if conf.Association.Location == AssociationLocationUndefined && be.beType == BackendActiveActive {
sslobodr392ebd52019-01-18 12:41:49 -0500209 log.Errorf("An association location must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400210 "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500211 rtrn_err = true
212 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400213 be.activeAssociation.location = conf.Association.Location
sslobodr392ebd52019-01-18 12:41:49 -0500214
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400215 if conf.Association.Field == "" && be.activeAssociation.location == AssociationLocationProtobuf {
sslobodr392ebd52019-01-18 12:41:49 -0500216 log.Errorf("An association field must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400217 "type is active/active and the location is set to protobuf "+
218 "for backend %s in cluster %s", conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500219 rtrn_err = true
220 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400221 be.activeAssociation.field = conf.Association.Field
sslobodr8e2ccb52019-02-05 09:21:47 -0500222
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400223 if conf.Association.Key == "" && be.activeAssociation.location == AssociationLocationHeader {
sslobodr8e2ccb52019-02-05 09:21:47 -0500224 log.Errorf("An association key must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400225 "type is active/active and the location is set to header "+
226 "for backend %s in cluster %s", conf.Name, clusterName)
sslobodr8e2ccb52019-02-05 09:21:47 -0500227 rtrn_err = true
228 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400229 be.activeAssociation.key = conf.Association.Key
sslobodr392ebd52019-01-18 12:41:49 -0500230 if rtrn_err {
231 return nil, errors.New("Backend configuration failed")
232 }
233 // Configure the connections
234 // Connections can consist of just a name. This allows for dynamic configuration
235 // at a later time.
236 // TODO: validate that there is one connection for all but active/active backends
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400237 if len(conf.Connections) > 1 && be.beType != BackendActiveActive {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400238 log.Errorf("Only one connection must be specified if the association " +
239 "strategy is not set to 'active_active'")
sslobodr8e2ccb52019-02-05 09:21:47 -0500240 rtrn_err = true
241 }
242 if len(conf.Connections) == 0 {
243 log.Errorf("At least one connection must be specified")
244 rtrn_err = true
245 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400246 for _, cnConf := range conf.Connections {
sslobodr392ebd52019-01-18 12:41:49 -0500247 if cnConf.Name == "" {
248 log.Errorf("A connection must have a name for backend %s in cluster %s",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400249 conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500250 } else {
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400251 gc := &gConnection{conn: nil, cancel: nil, state: connectivity.Idle}
252 be.connections[cnConf.Name] = &connection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, backend: be, gConn: gc}
sslobodr392ebd52019-01-18 12:41:49 -0500253 if cnConf.Addr != "" { // This connection will be specified later.
254 if ip := net.ParseIP(cnConf.Addr); ip == nil {
255 log.Errorf("The IP address for connection %s in backend %s in cluster %s is invalid",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400256 cnConf.Name, conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500257 rtrn_err = true
258 }
259 // Validate the port number. This just validtes that it's a non 0 integer
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400260 if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
sslobodr392ebd52019-01-18 12:41:49 -0500261 log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
262 cnConf.Port, cnConf.Name, conf.Name, clusterName)
263 rtrn_err = true
264 } else {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400265 if n <= 0 && n > 65535 {
sslobodr392ebd52019-01-18 12:41:49 -0500266 log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
267 cnConf.Port, cnConf.Name, conf.Name, clusterName)
268 rtrn_err = true
269 }
270 }
271 }
272 }
273 }
sslobodr63d160c2019-02-08 14:25:13 -0500274
sslobodr392ebd52019-01-18 12:41:49 -0500275 if rtrn_err {
276 return nil, errors.New("Connection configuration failed")
277 }
sslobodr392ebd52019-01-18 12:41:49 -0500278 // All is well start the backend cluster connections
279 be.connectAll()
280
281 return be, nil
282}
283
sslobodr392ebd52019-01-18 12:41:49 -0500284func (be *backend) incConn() {
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400285 be.mutex.Lock()
286 defer be.mutex.Unlock()
287 be.openConns++
sslobodr392ebd52019-01-18 12:41:49 -0500288}
289
290func (be *backend) decConn() {
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400291 be.mutex.Lock()
292 defer be.mutex.Unlock()
293 be.openConns--
294 if be.openConns < 0 {
sslobodr392ebd52019-01-18 12:41:49 -0500295 log.Error("Internal error, number of open connections less than 0")
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400296 be.openConns = 0
sslobodr392ebd52019-01-18 12:41:49 -0500297 }
298}
299
300// Attempts to establish all the connections for a backend
301// any failures result in an abort. This should only be called
302// on a first attempt to connect. Individual connections should be
303// handled after that.
304func (be *backend) connectAll() {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400305 for _, cn := range be.connections {
sslobodr392ebd52019-01-18 12:41:49 -0500306 cn.connect()
307 }
308}
309
sslobodr392ebd52019-01-18 12:41:49 -0500310// Set a callback for connection failure notification
311// This is currently not used.
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400312func (be *backend) setConnFailCallback(cb func(string, *backend) bool) {
313 be.connFailCallback = cb
sslobodr392ebd52019-01-18 12:41:49 -0500314}