blob: 17c3b4fb15857b32b959da7010f30cd4e6180ed9 [file] [log] [blame]
sslobodr392ebd52019-01-18 12:41:49 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16// gRPC affinity router with active/active backends
17
18package afrouter
19
20import (
21 "fmt"
22 "net"
23 "regexp"
24 "errors"
25 "strconv"
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/codes"
28 "github.com/opencord/voltha-go/common/log"
29)
30
31var (
32 clientStreamDescForProxying = &grpc.StreamDesc{
33 ServerStreams: true,
34 ClientStreams: true,
35 }
36)
37const (
38 REQ_ALL = 0
39 REQ_PACKAGE = 1
40 REQ_SERVICE = 2
41 REQ_METHOD = 3
42)
43
44type server struct {
45 running bool
46 name string
47 stype nbi
48 proxyListener net.Listener
49 routers map[string]map[string]Router
50 proxyServer *grpc.Server
51}
52
53type nbRequest struct {
54 srv interface{}
55 serverStream grpc.ServerStream
56 r Router
57 mthdSlice []string
58 metaKey string // There should be at most one key specified. More than one is an error.
59 metaVal string // This is the value extracted from the meta key if it exists or "" otherwise
60}
61
62var mthdSlicerExp string = `^/([a-zA-Z][a-zA-Z0-9]+)\.([a-zA-Z][a-zA-Z0-9]+)/([a-zA-Z][a-zA-Z0-9]+)`
63var mthdSlicer *regexp.Regexp // The compiled regex to extract the package/service/method
64
65
sslobodrcd37bc52019-01-24 11:47:16 -050066func newServer(config *ServerConfig) (*server,error) {
sslobodr392ebd52019-01-18 12:41:49 -050067 var err error = nil
68 var rtrn_err bool = false
69 var srvr *server
70 // Change over to the new configuration format
71 // Validate the configuration
72 // There should be a name
73 if config.Name == "" {
74 log.Error("A server has been defined with no name")
75 rtrn_err = true
76 }
77 // Validate that there's a port specified
78 if config.Port == 0 {
79 log.Errorf("Server %s does not have a valid port assigned", config.Name)
80 rtrn_err = true
81 }
82 // Validate the ip address if one is provided
83 if ip := net.ParseIP(config.Addr); config.Addr != "" && ip == nil {
84 log.Errorf("Invalid address '%s' provided for server '%s'", config.Addr, config.Name)
85 rtrn_err = true
86 }
87 if config.Type != "grpc" && config.Type != "streaming_grpc" {
88 if config.Type == "" {
89 log.Errorf("A server 'type' must be defined for server %s",config.Name)
90 } else {
91 log.Errorf("The server type must be either 'grpc' or 'streaming_grpc' "+
92 "but '%s' was found for server '%s'", config.Type, config.Name)
93 }
94 rtrn_err = true
95 }
96 if len(config.Routers) == 0 {
97 log.Errorf("At least one router must be specified for server '%s'", config.Name)
98 rtrn_err = true
99 }
100
101 if rtrn_err == true {
102 return nil, errors.New("Server configuration failed")
103 } else {
104 // The configuration is valid, create a server and configure it.
105 srvr = &server{name:config.Name,routers:make(map[string]map[string]Router)}
106 // The listener
107 if srvr.proxyListener, err =
108 net.Listen("tcp", config.Addr + ":"+
109 strconv.Itoa(int(config.Port))); err != nil {
110 log.Error(err)
111 return nil, err
112 }
113 // Create the routers
114 log.Debugf("Configuring the routers for server %s", srvr.name)
sslobodr5f0b5a32019-01-24 07:45:19 -0500115 for p,r := range config.routers {
sslobodr392ebd52019-01-18 12:41:49 -0500116 log.Debugf("Processing router %s for package %s", r.Name,p)
sslobodrcd37bc52019-01-24 11:47:16 -0500117 if dr,err := newRouter(r); err != nil {
sslobodr392ebd52019-01-18 12:41:49 -0500118 log.Error(err)
119 return nil, err
120 } else {
121 log.Debugf("Adding router %s to the server %s for package %s and service %s",
122 dr.Name(), srvr.name, p, dr.Service())
123 if _,ok := srvr.routers[p]; ok {
124 srvr.routers[p][dr.Service()] = dr
125 } else {
126 srvr.routers[p] = make(map[string]Router)
127 srvr.routers[p][dr.Service()] = dr
128 }
129 }
130 }
131 // Configure the grpc handler
132 srvr.proxyServer = grpc.NewServer(
133 grpc.CustomCodec(Codec()),
134 grpc.UnknownServiceHandler(srvr.TransparentHandler()),
135 )
136
137 }
138 // Compile the regular expression to extract the method
139 mthdSlicer = regexp.MustCompile(mthdSlicerExp)
140 return srvr, nil
141}
142
143func (s *server) Name() (string) {
144 return s.name
145}
146
147func (s *server) TransparentHandler() grpc.StreamHandler {
148 return s.handler
149}
150
151
152func (s *server) getRouter(pkg *string, service *string) (Router,bool) {
153 if fn, ok := s.routers[*pkg][*service]; ok { // Both specified
154 return fn, ok
155 } else if fn, ok = s.routers["*"][*service]; ok { // Package wild card
156 return fn, ok
157 } else if fn, ok = s.routers[*pkg]["*"]; ok { // Service wild card
158 return fn, ok
159 } else if fn, ok = s.routers["*"]["*"]; ok { // Both Wildcarded
160 return fn, ok
161 } else {
162 return nil,false
163 }
164}
165
166
167func (s *server) handler(srv interface{}, serverStream grpc.ServerStream) error {
168 // Determine what router is intended to handle this request
169 fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
170 if !ok {
171 return grpc.Errorf(codes.Internal, "lowLevelServerStream doesn't exist in context")
172 }
173 log.Debugf("Processing grpc request %s on server %s",fullMethodName,s.name)
174 // The full method name is structured as follows:
175 // <package name>.<service>/<method>
176 mthdSlice := mthdSlicer.FindStringSubmatch(fullMethodName)
177 if mthdSlice == nil {
178 log.Errorf("Faled to slice full method %s, result: %v", fullMethodName, mthdSlice)
179 } else {
180 log.Debugf("Sliced full method %s: %v", fullMethodName, mthdSlice)
181 }
182 r, ok := s.getRouter(&mthdSlice[REQ_PACKAGE],&mthdSlice[REQ_SERVICE])
183 //fn, ok := s.routers[mthdSlice[REQ_PACKAGE]][mthdSlice[REQ_SERVICE]]
184 if !ok {
185 // TODO: Should this be punted to a default transparent router??
186 // Probably not, if one is defined yes otherwise just crap out.
187
188 err := errors.New(
189 fmt.Sprintf("Unable to dispatch! Service '%s' for package '%s' not found.",
190 mthdSlice[REQ_SERVICE], mthdSlice[REQ_PACKAGE]))
191 log.Error(err)
192 return err
193 }
194 log.Debugf("Selected router %s\n", r.Name())
195
196 mk,mv,err := r.GetMetaKeyVal(serverStream)
197 if err != nil {
198 log.Error(err)
199 return err
200 }
201
202 //nbR := &nbRequest(srv:srv,serverStream:serverStream,r:r,mthdSlice:mthdSlice,metaKey:mk,metaVal:mv)
203
204 // Extract the cluster from the selected router and use it to manage the transfer
205 if bkndClstr,err := r.BackendCluster(mthdSlice[REQ_METHOD], mk); err != nil {
206 return err
207 } else {
208 //return bkndClstr.handler(nbR)
209 return bkndClstr.handler(srv, serverStream, r, mthdSlice, mk, mv)
210 }
211
212 return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
213}
214