blob: c9f94efb9c950c79e4d4e2efdc66b64ca59ee094 [file] [log] [blame]
Scott Baker112b0d42019-08-22 08:32:26 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouter
18
19/* Source-Router
20
21 This router implements source routing where the caller identifies the
22 component the message should be routed to. The `RouteField` should be
23 configured with the gRPC field name to inspect to determine the
24 destination. This field is assumed to be a string. That string will
25 then be used to identify a particular connection on a particular
26 backend.
27
28 The source-router must be configured with a backend cluster, as all routers
29 must identify a backend cluster. However, that backend cluster
30 is merely a placeholder and is not used by the source-router. The
31 source-router's Route() function will return whatever backend cluster is
32 specified by the `RouteField`.
33*/
34
35import (
36 "errors"
37 "fmt"
38 "github.com/golang/protobuf/proto"
39 pb "github.com/golang/protobuf/protoc-gen-go/descriptor"
40 "github.com/opencord/voltha-go/common/log"
41 "google.golang.org/grpc"
42 "io/ioutil"
43 "regexp"
44 "strconv"
45)
46
47type SourceRouter struct {
48 name string
49 //association associationType
50 routingField string
51 grpcService string
52 protoDescriptor *pb.FileDescriptorSet
53 methodMap map[string]byte
54 cluster *cluster
55}
56
57func newSourceRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
58 var err error = nil
59 var rtrn_err = false
60 var pkg_re = regexp.MustCompile(`^(\.[^.]+\.)(.+)$`)
61 // Validate the configuration
62
63 // A name must exist
64 if config.Name == "" {
65 log.Error("A router 'name' must be specified")
66 rtrn_err = true
67 }
68
69 if rconf.ProtoPackage == "" {
70 log.Error("A 'package' must be specified")
71 rtrn_err = true
72 }
73
74 if rconf.ProtoService == "" {
75 log.Error("A 'service' must be specified")
76 rtrn_err = true
77 }
78
79 if config.RouteField == "" {
80 log.Error("A 'routing_field' must be specified")
81 rtrn_err = true
82 }
83
84 // TODO The overrieds section is currently not being used
85 // so the router will route all methods based on the
86 // routing_field. This needs to be added so that methods
87 // can have different routing fields.
88 dr := SourceRouter{
89 name: config.Name,
90 grpcService: rconf.ProtoService,
91 methodMap: make(map[string]byte),
92 }
93
94 // Load the protobuf descriptor file
95 dr.protoDescriptor = &pb.FileDescriptorSet{}
96 fb, err := ioutil.ReadFile(rconf.ProtoFile)
97 if err != nil {
98 log.Errorf("Could not open proto file '%s'", rconf.ProtoFile)
99 rtrn_err = true
100 }
101 err = proto.Unmarshal(fb, dr.protoDescriptor)
102 if err != nil {
103 log.Errorf("Could not unmarshal %s, %v", "proto.pb", err)
104 rtrn_err = true
105 }
106
107 // Build the routing structure based on the loaded protobuf
108 // descriptor file and the config information.
109 type key struct {
110 method string
111 field string
112 }
113 var msgs = make(map[key]byte)
114 for _, f := range dr.protoDescriptor.File {
115 // Build a temporary map of message types by name.
116 for _, m := range f.MessageType {
117 for _, fld := range m.Field {
118 log.Debugf("Processing message '%s', field '%s'", *m.Name, *fld.Name)
119 msgs[key{*m.Name, *fld.Name}] = byte(*fld.Number)
120 }
121 }
122 }
123 log.Debugf("The map contains: %v", msgs)
124 for _, f := range dr.protoDescriptor.File {
125 if *f.Package == rconf.ProtoPackage {
126 for _, s := range f.Service {
127 if *s.Name == rconf.ProtoService {
128 log.Debugf("Loading package data '%s' for service '%s' for router '%s'", *f.Package, *s.Name, dr.name)
129 // Now create a map keyed by method name with the value being the
130 // field number of the route selector.
131 var ok bool
132 for _, m := range s.Method {
133 // Find the input type in the messages and extract the
134 // field number and save it for future reference.
135 log.Debugf("Processing method '%s'", *m.Name)
136 // Determine if this is a method we're supposed to be processing.
137 if needMethod(*m.Name, config) {
138 log.Debugf("Enabling method '%s'", *m.Name)
139 pkg_methd := pkg_re.FindStringSubmatch(*m.InputType)
140 if pkg_methd == nil {
141 log.Errorf("Regular expression didn't match input type '%s'", *m.InputType)
142 rtrn_err = true
143 }
144 // The input type has the package name prepended to it. Remove it.
145 //in := (*m.InputType)[len(rconf.ProtoPackage)+2:]
146 in := pkg_methd[PKG_MTHD_MTHD]
147 dr.methodMap[*m.Name], ok = msgs[key{in, config.RouteField}]
148 if !ok {
149 log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
150 *m.Name, config.RouteField, in)
151 rtrn_err = true
152 }
153 }
154 }
155 }
156 }
157 }
158 }
159
160 // We need to pick a cluster, because server will call cluster.handler. The choice we make doesn't
161 // matter, as we can return a different cluster from Route().
162 ok := true
163 if dr.cluster, ok = clusters[config.backendCluster.Name]; !ok {
164 if dr.cluster, err = newBackendCluster(config.backendCluster); err != nil {
165 log.Errorf("Could not create a backend for router %s", config.Name)
166 rtrn_err = true
167 }
168 }
169
170 if rtrn_err {
171 return dr, errors.New(fmt.Sprintf("Failed to create a new router '%s'", dr.name))
172 }
173
174 return dr, nil
175}
176
177func (ar SourceRouter) Service() string {
178 return ar.grpcService
179}
180
181func (ar SourceRouter) Name() string {
182 return ar.name
183}
184
185func (ar SourceRouter) skipField(data *[]byte, idx *int) error {
186 switch (*data)[*idx] & 3 {
187 case 0: // Varint
188 // skip the field number/type
189 *idx++
190 // if the msb is set, then more bytes to follow
191 for (*data)[*idx] >= 128 {
192 *idx++
193 }
194 // the last byte doesn't have the msb set
195 *idx++
196 case 1: // 64 bit
197 *idx += 9
198 case 2: // Length delimited
199 // skip the field number / type
200 *idx++
201 // read a varint that tells length of string
202 b := proto.NewBuffer((*data)[*idx:])
203 t, _ := b.DecodeVarint()
204 // skip the length varint and the string bytes
205 // TODO: This assumes the varint was one byte long -- max string length is 127 bytes
206 *idx += int(t) + 1
207 case 3: // Deprecated
208 case 4: // Deprecated
209 case 5: // 32 bit
210 *idx += 5
211 }
212 return nil
213}
214
215func (ar SourceRouter) decodeProtoField(payload []byte, fieldId byte) (string, error) {
216 idx := 0
217 b := proto.NewBuffer([]byte{})
218 //b.DebugPrint("The Buffer", payload)
219 for { // Find the route selector field
220 log.Debugf("Decoding source value attributeNumber: %d from %v at index %d", fieldId, payload, idx)
221 log.Debugf("Attempting match with payload: %d, methodTable: %d", payload[idx], fieldId)
222 if payload[idx]>>3 == fieldId {
223 log.Debugf("Method match with payload: %d, methodTable: %d", payload[idx], fieldId)
224 // TODO: Consider supporting other selector types.... Way, way in the future
225 // ok, the future is now, support strings as well... ugh.
226 var selector string
227 switch payload[idx] & 3 {
228 case 0: // Integer
229 b.SetBuf(payload[idx+1:])
230 v, e := b.DecodeVarint()
231 if e == nil {
232 log.Debugf("Decoded the ing field: %v", v)
233 selector = strconv.Itoa(int(v))
234 } else {
235 log.Errorf("Failed to decode varint %v", e)
236 return "", e
237 }
238 case 2: // Length delimited AKA string
239 b.SetBuf(payload[idx+1:])
240 v, e := b.DecodeStringBytes()
241 if e == nil {
242 log.Debugf("Decoded the string field: %v", v)
243 selector = v
244 } else {
245 log.Errorf("Failed to decode string %v", e)
246 return "", e
247 }
248 default:
249 err := errors.New(fmt.Sprintf("Only integer and string route selectors are permitted"))
250 log.Error(err)
251 return "", err
252 }
253 return selector, nil
254 } else if err := ar.skipField(&payload, &idx); err != nil {
255 log.Errorf("Parsing message failed %v", err)
256 return "", err
257 }
258 }
259}
260
261func (ar SourceRouter) Route(sel interface{}) (*backend, *connection) {
262 log.Debugf("SourceRouter sel %v", sel)
263 switch sl := sel.(type) {
264 case *requestFrame:
265 log.Debugf("Route called for nbFrame with method %s", sl.methodInfo.method)
266 // Not a south affinity binding method, proceed with north affinity binding.
267 if selector, err := ar.decodeProtoField(sl.payload, ar.methodMap[sl.methodInfo.method]); err == nil {
268 // selector is
269
270 for _, cluster := range clusters {
271 for _, backend := range cluster.backends {
272 log.Debugf("Checking backend %s", backend.name)
273 for _, connection := range backend.connections {
274 log.Debugf("Checking connection %s", connection.name)
275 // caller specified a backend and a connection
276 if backend.name+"."+connection.name == selector {
277 return backend, connection
278 }
279 }
280 // caller specified just a backend
281 if backend.name == selector {
282 return backend, nil
283 }
284 }
285 }
286 sl.err = fmt.Errorf("Backend %s not found", selector)
287 return nil, nil
288 }
289 default:
290 log.Errorf("Internal: invalid data type in Route call %v", sel)
291 return nil, nil
292 }
293 log.Errorf("Bad routing in SourceRouter:Route")
294 return nil, nil
295}
296
297func (ar SourceRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
298 return "", "", nil
299}
300
301func (ar SourceRouter) IsStreaming(_ string) (bool, bool) {
302 panic("not implemented")
303}
304
305func (ar SourceRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
306 // unsupported?
307 return ar.cluster, nil
308}
309
310func (ar SourceRouter) FindBackendCluster(beName string) *cluster {
311 // unsupported?
312 if beName == ar.cluster.name {
313 return ar.cluster
314 }
315 return nil
316}
317
318func (rr SourceRouter) ReplyHandler(sel interface{}) error { // This is a no-op
319 return nil
320}