blob: 0691da1d5eb3f9e41302236f43ac692ab1439225 [file] [log] [blame]
sslobodr392ebd52019-01-18 12:41:49 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16// gRPC affinity router with active/active backends
17
18package afrouter
19
20import (
sslobodr392ebd52019-01-18 12:41:49 -050021 "errors"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040022 "fmt"
23 "github.com/opencord/voltha-go/common/log"
sslobodr392ebd52019-01-18 12:41:49 -050024 "google.golang.org/grpc"
25 "google.golang.org/grpc/codes"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040026 "net"
27 "regexp"
28 "strconv"
sslobodr392ebd52019-01-18 12:41:49 -050029)
30
31var (
32 clientStreamDescForProxying = &grpc.StreamDesc{
33 ServerStreams: true,
34 ClientStreams: true,
35 }
36)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040037
sslobodr392ebd52019-01-18 12:41:49 -050038const (
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040039 REQ_ALL = 0
sslobodr392ebd52019-01-18 12:41:49 -050040 REQ_PACKAGE = 1
41 REQ_SERVICE = 2
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040042 REQ_METHOD = 3
sslobodr392ebd52019-01-18 12:41:49 -050043)
44
45type server struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040046 running bool
47 name string
48 stype nbi
sslobodr392ebd52019-01-18 12:41:49 -050049 proxyListener net.Listener
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040050 routers map[string]map[string]Router
51 proxyServer *grpc.Server
sslobodr392ebd52019-01-18 12:41:49 -050052}
53
54type nbRequest struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040055 srv interface{}
sslobodr392ebd52019-01-18 12:41:49 -050056 serverStream grpc.ServerStream
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040057 r Router
58 mthdSlice []string
59 metaKey string // There should be at most one key specified. More than one is an error.
60 metaVal string // This is the value extracted from the meta key if it exists or "" otherwise
sslobodr392ebd52019-01-18 12:41:49 -050061}
62
63var mthdSlicerExp string = `^/([a-zA-Z][a-zA-Z0-9]+)\.([a-zA-Z][a-zA-Z0-9]+)/([a-zA-Z][a-zA-Z0-9]+)`
64var mthdSlicer *regexp.Regexp // The compiled regex to extract the package/service/method
65
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040066func newServer(config *ServerConfig) (*server, error) {
sslobodr392ebd52019-01-18 12:41:49 -050067 var err error = nil
68 var rtrn_err bool = false
69 var srvr *server
70 // Change over to the new configuration format
71 // Validate the configuration
72 // There should be a name
73 if config.Name == "" {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040074 log.Error("A server has been defined with no name")
sslobodr392ebd52019-01-18 12:41:49 -050075 rtrn_err = true
76 }
77 // Validate that there's a port specified
78 if config.Port == 0 {
79 log.Errorf("Server %s does not have a valid port assigned", config.Name)
80 rtrn_err = true
81 }
82 // Validate the ip address if one is provided
83 if ip := net.ParseIP(config.Addr); config.Addr != "" && ip == nil {
84 log.Errorf("Invalid address '%s' provided for server '%s'", config.Addr, config.Name)
85 rtrn_err = true
86 }
87 if config.Type != "grpc" && config.Type != "streaming_grpc" {
88 if config.Type == "" {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040089 log.Errorf("A server 'type' must be defined for server %s", config.Name)
sslobodr392ebd52019-01-18 12:41:49 -050090 } else {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040091 log.Errorf("The server type must be either 'grpc' or 'streaming_grpc' "+
92 "but '%s' was found for server '%s'", config.Type, config.Name)
sslobodr392ebd52019-01-18 12:41:49 -050093 }
94 rtrn_err = true
95 }
96 if len(config.Routers) == 0 {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040097 log.Errorf("At least one router must be specified for server '%s'", config.Name)
sslobodr392ebd52019-01-18 12:41:49 -050098 rtrn_err = true
99 }
100
101 if rtrn_err == true {
102 return nil, errors.New("Server configuration failed")
103 } else {
104 // The configuration is valid, create a server and configure it.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400105 srvr = &server{name: config.Name, routers: make(map[string]map[string]Router)}
sslobodr392ebd52019-01-18 12:41:49 -0500106 // The listener
107 if srvr.proxyListener, err =
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400108 net.Listen("tcp", config.Addr+":"+
109 strconv.Itoa(int(config.Port))); err != nil {
sslobodr392ebd52019-01-18 12:41:49 -0500110 log.Error(err)
111 return nil, err
112 }
113 // Create the routers
114 log.Debugf("Configuring the routers for server %s", srvr.name)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400115 for p, r := range config.routers {
116 log.Debugf("Processing router %s for package %s", r.Name, p)
117 if dr, err := newRouter(r); err != nil {
118 log.Error(err)
119 return nil, err
120 } else {
sslobodr392ebd52019-01-18 12:41:49 -0500121 log.Debugf("Adding router %s to the server %s for package %s and service %s",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400122 dr.Name(), srvr.name, p, dr.Service())
123 if _, ok := srvr.routers[p]; ok {
sslobodr392ebd52019-01-18 12:41:49 -0500124 srvr.routers[p][dr.Service()] = dr
125 } else {
126 srvr.routers[p] = make(map[string]Router)
127 srvr.routers[p][dr.Service()] = dr
128 }
129 }
130 }
131 // Configure the grpc handler
132 srvr.proxyServer = grpc.NewServer(
133 grpc.CustomCodec(Codec()),
134 grpc.UnknownServiceHandler(srvr.TransparentHandler()),
135 )
136
137 }
138 // Compile the regular expression to extract the method
139 mthdSlicer = regexp.MustCompile(mthdSlicerExp)
140 return srvr, nil
141}
142
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400143func (s *server) Name() string {
sslobodr392ebd52019-01-18 12:41:49 -0500144 return s.name
145}
146
147func (s *server) TransparentHandler() grpc.StreamHandler {
148 return s.handler
149}
150
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400151func (s *server) getRouter(pkg *string, service *string) (Router, bool) {
sslobodr392ebd52019-01-18 12:41:49 -0500152 if fn, ok := s.routers[*pkg][*service]; ok { // Both specified
153 return fn, ok
154 } else if fn, ok = s.routers["*"][*service]; ok { // Package wild card
155 return fn, ok
156 } else if fn, ok = s.routers[*pkg]["*"]; ok { // Service wild card
157 return fn, ok
158 } else if fn, ok = s.routers["*"]["*"]; ok { // Both Wildcarded
159 return fn, ok
160 } else {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400161 return nil, false
sslobodr392ebd52019-01-18 12:41:49 -0500162 }
163}
164
sslobodr392ebd52019-01-18 12:41:49 -0500165func (s *server) handler(srv interface{}, serverStream grpc.ServerStream) error {
166 // Determine what router is intended to handle this request
167 fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
168 if !ok {
169 return grpc.Errorf(codes.Internal, "lowLevelServerStream doesn't exist in context")
170 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400171 log.Debugf("Processing grpc request %s on server %s", fullMethodName, s.name)
sslobodr392ebd52019-01-18 12:41:49 -0500172 // The full method name is structured as follows:
173 // <package name>.<service>/<method>
174 mthdSlice := mthdSlicer.FindStringSubmatch(fullMethodName)
175 if mthdSlice == nil {
176 log.Errorf("Faled to slice full method %s, result: %v", fullMethodName, mthdSlice)
177 } else {
178 log.Debugf("Sliced full method %s: %v", fullMethodName, mthdSlice)
179 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400180 r, ok := s.getRouter(&mthdSlice[REQ_PACKAGE], &mthdSlice[REQ_SERVICE])
sslobodr392ebd52019-01-18 12:41:49 -0500181 //fn, ok := s.routers[mthdSlice[REQ_PACKAGE]][mthdSlice[REQ_SERVICE]]
182 if !ok {
183 // TODO: Should this be punted to a default transparent router??
184 // Probably not, if one is defined yes otherwise just crap out.
185
186 err := errors.New(
187 fmt.Sprintf("Unable to dispatch! Service '%s' for package '%s' not found.",
188 mthdSlice[REQ_SERVICE], mthdSlice[REQ_PACKAGE]))
189 log.Error(err)
190 return err
191 }
192 log.Debugf("Selected router %s\n", r.Name())
193
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400194 mk, mv, err := r.GetMetaKeyVal(serverStream)
sslobodr392ebd52019-01-18 12:41:49 -0500195 if err != nil {
196 log.Error(err)
197 return err
198 }
199
200 //nbR := &nbRequest(srv:srv,serverStream:serverStream,r:r,mthdSlice:mthdSlice,metaKey:mk,metaVal:mv)
201
202 // Extract the cluster from the selected router and use it to manage the transfer
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400203 if bkndClstr, err := r.BackendCluster(mthdSlice[REQ_METHOD], mk); err != nil {
sslobodr392ebd52019-01-18 12:41:49 -0500204 return err
205 } else {
206 //return bkndClstr.handler(nbR)
207 return bkndClstr.handler(srv, serverStream, r, mthdSlice, mk, mv)
208 }
sslobodr392ebd52019-01-18 12:41:49 -0500209}