blob: c597d493882721b24c0160f23de07b20f4e9ffd6 [file] [log] [blame]
sslobodr392ebd52019-01-18 12:41:49 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
sslobodr392ebd52019-01-18 12:41:49 -050016
17package afrouter
18
19// Backend manager handles redundant connections per backend
20
21import (
sslobodr392ebd52019-01-18 12:41:49 -050022 "errors"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040023 "fmt"
24 "github.com/opencord/voltha-go/common/log"
sslobodr392ebd52019-01-18 12:41:49 -050025 "golang.org/x/net/context"
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/codes"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040028 "google.golang.org/grpc/metadata"
Kent Hagermanfcfb16b2019-06-20 11:40:03 -040029 "net/url"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040030 "strconv"
31 "strings"
32 "sync"
sslobodr392ebd52019-01-18 12:41:49 -050033)
34
Kent Hagerman1e9061e2019-05-21 16:01:21 -040035// backend represents a collection of backends in a HA configuration
sslobodr392ebd52019-01-18 12:41:49 -050036type backend struct {
Kent Hagerman1e9061e2019-05-21 16:01:21 -040037 mutex sync.Mutex
38 name string
39 beType backendType
40 activeAssociation association
41 connFailCallback func(string, *backend) bool
42 connections map[string]*connection
Kent Hagerman03b58992019-08-29 17:21:03 -040043 openConns map[*connection]*grpc.ClientConn
44 activeRequests map[*request]struct{}
sslobodr392ebd52019-01-18 12:41:49 -050045}
46
Kent Hagerman1e9061e2019-05-21 16:01:21 -040047type association struct {
48 strategy associationStrategy
49 location associationLocation
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040050 field string // Used only if location is protobuf
51 key string
sslobodr392ebd52019-01-18 12:41:49 -050052}
53
Kent Hagerman03b58992019-08-29 17:21:03 -040054// splitActiveStreamsUnsafe expects the caller to have already locked the backend mutex
55func (be *backend) splitActiveStreamsUnsafe(cn *connection, conn *grpc.ClientConn) {
56 if len(be.activeRequests) != 0 {
57 log.Debugf("Creating new streams for %d existing requests", len(be.activeRequests))
58 }
59 for r := range be.activeRequests {
60 r.mutex.Lock()
61 if _, have := r.streams[cn.name]; !have {
62 log.Debugf("Opening southbound stream for existing request '%s'", r.methodInfo.method)
63 if stream, err := grpc.NewClientStream(r.ctx, clientStreamDescForProxying, conn, r.methodInfo.all); err != nil {
64 log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
65 } else {
66 go r.catchupRequestStreamThenForwardResponseStream(cn.name, stream)
67 // new thread will unlock the request mutex
68 continue
69 }
70 }
71 r.mutex.Unlock()
72 }
73}
sslobodr392ebd52019-01-18 12:41:49 -050074
Kent Hagerman03b58992019-08-29 17:21:03 -040075// openSouthboundStreams sets up a connection to each southbound frame
76func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, nf *requestFrame, sf *responseFrame) (*request, error) {
77 be.mutex.Lock()
78 defer be.mutex.Unlock()
sslobodr392ebd52019-01-18 12:41:49 -050079
Kent Hagerman03b58992019-08-29 17:21:03 -040080 isStreamingRequest, isStreamingResponse := nf.router.IsStreaming(nf.methodInfo.method)
81
sslobodr392ebd52019-01-18 12:41:49 -050082 // Get the metadata from the incoming message on the server
83 md, ok := metadata.FromIncomingContext(serverStream.Context())
84 if !ok {
Kent Hagerman03b58992019-08-29 17:21:03 -040085 return nil, errors.New("could not get a server stream metadata")
sslobodr392ebd52019-01-18 12:41:49 -050086 }
87
Kent Hagerman03b58992019-08-29 17:21:03 -040088 r := &request{
89 // Create an outgoing context that includes the incoming metadata and that will cancel if the server's context is canceled
A R Karthick691c9762019-09-20 22:22:59 +000090 ctx: metadata.AppendToOutgoingContext(metadata.NewOutgoingContext(serverStream.Context(), md.Copy()), "voltha_serial_number", nf.serialNo),
Kent Hagerman03b58992019-08-29 17:21:03 -040091
92 streams: make(map[string]grpc.ClientStream),
93 responseErrChan: make(chan error, 1),
94
95 backend: be,
96 serverStream: serverStream,
97 methodInfo: nf.methodInfo,
98 requestFrame: nf,
99 responseFrame: sf,
100 isStreamingRequest: isStreamingRequest,
101 isStreamingResponse: isStreamingResponse,
102 }
103
104 log.Debugf("Opening southbound request for method '%s'", nf.methodInfo.method)
105
sslobodr392ebd52019-01-18 12:41:49 -0500106 // TODO: Need to check if this is an active/active backend cluster
107 // with a serial number in the header.
A R Karthick691c9762019-09-20 22:22:59 +0000108 log.Debugf("Serial number for transaction allocated: %s", nf.serialNo)
sslobodr392ebd52019-01-18 12:41:49 -0500109 // If even one stream can be created then proceed. If none can be
Kent Hagerman03b58992019-08-29 17:21:03 -0400110 // created then report an error because both the primary and redundant
111 // connections are non-existent.
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400112 var atLeastOne = false
sslobodr392ebd52019-01-18 12:41:49 -0500113 var errStr strings.Builder
Scott Baker112b0d42019-08-22 08:32:26 -0700114
Kent Hagerman03b58992019-08-29 17:21:03 -0400115 log.Debugf("There are %d/%d streams to open", len(be.openConns), len(be.connections))
Scott Baker112b0d42019-08-22 08:32:26 -0700116 if nf.connection != nil {
117 // Debug statement triggered by source router. Other routers have no connection preference.
118 log.Debugf("Looking for connection %s", nf.connection.name)
119 }
Kent Hagerman03b58992019-08-29 17:21:03 -0400120 for cn, conn := range be.openConns {
Scott Baker112b0d42019-08-22 08:32:26 -0700121 // If source-router was used, it will indicate a specific connection to be used
122 if nf.connection != nil && nf.connection != cn {
123 continue
124 }
125
Kent Hagerman03b58992019-08-29 17:21:03 -0400126 log.Debugf("Opening stream for connection '%s'", cn.name)
127 if stream, err := grpc.NewClientStream(r.ctx, clientStreamDescForProxying, conn, r.methodInfo.all); err != nil {
128 log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
sslobodr392ebd52019-01-18 12:41:49 -0500129 } else {
Kent Hagerman03b58992019-08-29 17:21:03 -0400130 r.streams[cn.name] = stream
131 go r.forwardResponseStream(cn.name, stream)
132 atLeastOne = true
sslobodr392ebd52019-01-18 12:41:49 -0500133 }
134 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400135 if atLeastOne {
Kent Hagerman03b58992019-08-29 17:21:03 -0400136 be.activeRequests[r] = struct{}{}
137 return r, nil
sslobodr392ebd52019-01-18 12:41:49 -0500138 }
Kent Hagerman03b58992019-08-29 17:21:03 -0400139 fmt.Fprintf(&errStr, "{{No open connections for backend '%s' unable to send}} ", be.name)
sslobodr392ebd52019-01-18 12:41:49 -0500140 log.Error(errStr.String())
141 return nil, errors.New(errStr.String())
142}
143
Kent Hagerman03b58992019-08-29 17:21:03 -0400144func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *requestFrame, sf *responseFrame) error {
145 // Set up streams for each open connection
146 request, err := be.openSouthboundStreams(srv, serverStream, nf, sf)
sslobodr392ebd52019-01-18 12:41:49 -0500147 if err != nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400148 log.Errorf("openStreams failed: %v", err)
sslobodr392ebd52019-01-18 12:41:49 -0500149 return err
150 }
sslobodr392ebd52019-01-18 12:41:49 -0500151
Kent Hagerman03b58992019-08-29 17:21:03 -0400152 log.Debug("Starting request stream forwarding")
153 if s2cErr := request.forwardRequestStream(serverStream); s2cErr != nil {
154 // exit with an error to the stack
155 return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
sslobodr392ebd52019-01-18 12:41:49 -0500156 }
Kent Hagerman03b58992019-08-29 17:21:03 -0400157 // wait for response stream to complete
158 return <-request.responseErrChan
sslobodr392ebd52019-01-18 12:41:49 -0500159}
160
sslobodr392ebd52019-01-18 12:41:49 -0500161func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
162 var rtrn_err bool = false
163
164 log.Debugf("Configuring the backend with %v", *conf)
165 // Validate the conifg and configure the backend
Kent Hagerman03b58992019-08-29 17:21:03 -0400166 be := &backend{
167 name: conf.Name,
168 connections: make(map[string]*connection),
169 openConns: make(map[*connection]*grpc.ClientConn),
170 activeRequests: make(map[*request]struct{}),
171 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400172 if conf.Type == BackendUndefined {
sslobodr392ebd52019-01-18 12:41:49 -0500173 log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
174 rtrn_err = true
175 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400176 be.beType = conf.Type
sslobodr392ebd52019-01-18 12:41:49 -0500177
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400178 if conf.Association.Strategy == AssociationStrategyUndefined && be.beType == BackendActiveActive {
sslobodr392ebd52019-01-18 12:41:49 -0500179 log.Errorf("An association strategy must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400180 "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500181 rtrn_err = true
182 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400183 be.activeAssociation.strategy = conf.Association.Strategy
sslobodr392ebd52019-01-18 12:41:49 -0500184
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400185 if conf.Association.Location == AssociationLocationUndefined && be.beType == BackendActiveActive {
sslobodr392ebd52019-01-18 12:41:49 -0500186 log.Errorf("An association location must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400187 "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500188 rtrn_err = true
189 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400190 be.activeAssociation.location = conf.Association.Location
sslobodr392ebd52019-01-18 12:41:49 -0500191
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400192 if conf.Association.Field == "" && be.activeAssociation.location == AssociationLocationProtobuf {
sslobodr392ebd52019-01-18 12:41:49 -0500193 log.Errorf("An association field must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400194 "type is active/active and the location is set to protobuf "+
195 "for backend %s in cluster %s", conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500196 rtrn_err = true
197 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400198 be.activeAssociation.field = conf.Association.Field
sslobodr8e2ccb52019-02-05 09:21:47 -0500199
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400200 if conf.Association.Key == "" && be.activeAssociation.location == AssociationLocationHeader {
sslobodr8e2ccb52019-02-05 09:21:47 -0500201 log.Errorf("An association key must be provided if the backend "+
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400202 "type is active/active and the location is set to header "+
203 "for backend %s in cluster %s", conf.Name, clusterName)
sslobodr8e2ccb52019-02-05 09:21:47 -0500204 rtrn_err = true
205 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400206 be.activeAssociation.key = conf.Association.Key
sslobodr392ebd52019-01-18 12:41:49 -0500207 if rtrn_err {
208 return nil, errors.New("Backend configuration failed")
209 }
210 // Configure the connections
211 // Connections can consist of just a name. This allows for dynamic configuration
212 // at a later time.
213 // TODO: validate that there is one connection for all but active/active backends
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400214 if len(conf.Connections) > 1 && be.beType != BackendActiveActive {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400215 log.Errorf("Only one connection must be specified if the association " +
216 "strategy is not set to 'active_active'")
sslobodr8e2ccb52019-02-05 09:21:47 -0500217 rtrn_err = true
218 }
219 if len(conf.Connections) == 0 {
220 log.Errorf("At least one connection must be specified")
221 rtrn_err = true
222 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400223 for _, cnConf := range conf.Connections {
sslobodr392ebd52019-01-18 12:41:49 -0500224 if cnConf.Name == "" {
225 log.Errorf("A connection must have a name for backend %s in cluster %s",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400226 conf.Name, clusterName)
sslobodr392ebd52019-01-18 12:41:49 -0500227 } else {
Kent Hagerman03b58992019-08-29 17:21:03 -0400228 ctx, cancelFunc := context.WithCancel(context.Background())
229 be.connections[cnConf.Name] = &connection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, backend: be, ctx: ctx, close: cancelFunc}
230 if _, err := url.Parse(cnConf.Addr); err != nil {
231 log.Errorf("The address for connection %s in backend %s in cluster %s is invalid: %s",
232 cnConf.Name, conf.Name, clusterName, err)
233 rtrn_err = true
234 }
235 // Validate the port number. This just validtes that it's a non 0 integer
236 if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
237 log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
238 cnConf.Port, cnConf.Name, conf.Name, clusterName)
239 rtrn_err = true
240 } else {
241 if n <= 0 && n > 65535 {
sslobodr392ebd52019-01-18 12:41:49 -0500242 log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
243 cnConf.Port, cnConf.Name, conf.Name, clusterName)
244 rtrn_err = true
sslobodr392ebd52019-01-18 12:41:49 -0500245 }
246 }
247 }
248 }
sslobodr63d160c2019-02-08 14:25:13 -0500249
sslobodr392ebd52019-01-18 12:41:49 -0500250 if rtrn_err {
251 return nil, errors.New("Connection configuration failed")
252 }
sslobodr392ebd52019-01-18 12:41:49 -0500253 // All is well start the backend cluster connections
254 be.connectAll()
255
256 return be, nil
257}
258
Kent Hagerman03b58992019-08-29 17:21:03 -0400259func (be *backend) incConn(cn *connection, conn *grpc.ClientConn) {
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400260 be.mutex.Lock()
261 defer be.mutex.Unlock()
Kent Hagerman03b58992019-08-29 17:21:03 -0400262
263 be.openConns[cn] = conn
264 be.splitActiveStreamsUnsafe(cn, conn)
sslobodr392ebd52019-01-18 12:41:49 -0500265}
266
Kent Hagerman03b58992019-08-29 17:21:03 -0400267func (be *backend) decConn(cn *connection) {
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400268 be.mutex.Lock()
269 defer be.mutex.Unlock()
Kent Hagerman03b58992019-08-29 17:21:03 -0400270
271 delete(be.openConns, cn)
272}
273
274func (be *backend) NumOpenConnections() int {
275 be.mutex.Lock()
276 defer be.mutex.Unlock()
277
278 return len(be.openConns)
sslobodr392ebd52019-01-18 12:41:49 -0500279}
280
281// Attempts to establish all the connections for a backend
282// any failures result in an abort. This should only be called
283// on a first attempt to connect. Individual connections should be
284// handled after that.
285func (be *backend) connectAll() {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400286 for _, cn := range be.connections {
Kent Hagerman03b58992019-08-29 17:21:03 -0400287 go cn.connect()
sslobodr392ebd52019-01-18 12:41:49 -0500288 }
289}
290
sslobodr392ebd52019-01-18 12:41:49 -0500291// Set a callback for connection failure notification
292// This is currently not used.
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400293func (be *backend) setConnFailCallback(cb func(string, *backend) bool) {
294 be.connFailCallback = cb
sslobodr392ebd52019-01-18 12:41:49 -0500295}