blob: 239084657876c7ae86f1b6d5eb8f03294ee743be [file] [log] [blame]
sslobodr392ebd52019-01-18 12:41:49 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
sslobodr392ebd52019-01-18 12:41:49 -050016
17package afrouter
18
19import (
sslobodr392ebd52019-01-18 12:41:49 -050020 "errors"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040021 "fmt"
22 "github.com/golang/protobuf/proto"
23 pb "github.com/golang/protobuf/protoc-gen-go/descriptor"
24 "github.com/opencord/voltha-go/common/log"
25 "google.golang.org/grpc"
26 "io/ioutil"
sslobodr1d1e50b2019-03-14 09:17:40 -040027 "regexp"
sslobodr392ebd52019-01-18 12:41:49 -050028 "strconv"
sslobodr392ebd52019-01-18 12:41:49 -050029)
30
sslobodr1d1e50b2019-03-14 09:17:40 -040031const (
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040032 PKG_MTHD_PKG int = 1
sslobodr1d1e50b2019-03-14 09:17:40 -040033 PKG_MTHD_MTHD int = 2
34)
35
sslobodr392ebd52019-01-18 12:41:49 -050036type AffinityRouter struct {
Kent Hagerman1e9061e2019-05-21 16:01:21 -040037 name string
38 association associationType
39 routingField string
40 grpcService string
41 protoDescriptor *pb.FileDescriptorSet
42 methodMap map[string]byte
43 nbBindingMethodMap map[string]byte
44 cluster *cluster
45 affinity map[string]*backend
46 currentBackend **backend
sslobodr392ebd52019-01-18 12:41:49 -050047}
48
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040049func newAffinityRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
sslobodr392ebd52019-01-18 12:41:49 -050050 var err error = nil
Kent Hagerman1e9061e2019-05-21 16:01:21 -040051 var rtrn_err = false
52 var pkg_re = regexp.MustCompile(`^(\.[^.]+\.)(.+)$`)
sslobodr392ebd52019-01-18 12:41:49 -050053 // Validate the configuration
54
55 // A name must exist
56 if config.Name == "" {
57 log.Error("A router 'name' must be specified")
58 rtrn_err = true
59 }
60
61 if rconf.ProtoPackage == "" {
62 log.Error("A 'package' must be specified")
63 rtrn_err = true
64 }
65
66 if rconf.ProtoService == "" {
67 log.Error("A 'service' must be specified")
68 rtrn_err = true
69 }
70
71 //if config.RouteField == "" {
72 // log.Error("A 'routing_field' must be specified")
73 // rtrn_err = true
74 //}
75
76 // TODO The overrieds section is currently not being used
77 // so the router will route all methods based on the
78 // routing_field. This needs to be added so that methods
79 // can have different routing fields.
80 var bptr *backend
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040081 bptr = nil
sslobodr392ebd52019-01-18 12:41:49 -050082 dr := AffinityRouter{
Kent Hagerman1e9061e2019-05-21 16:01:21 -040083 name: config.Name,
84 grpcService: rconf.ProtoService,
85 affinity: make(map[string]*backend),
86 methodMap: make(map[string]byte),
87 nbBindingMethodMap: make(map[string]byte),
88 currentBackend: &bptr,
sslobodr392ebd52019-01-18 12:41:49 -050089 }
90 // An association must exist
Kent Hagerman1e9061e2019-05-21 16:01:21 -040091 dr.association = config.Association
92 if dr.association == AssociationUndefined {
93 log.Error("An association must be specified")
sslobodr392ebd52019-01-18 12:41:49 -050094 rtrn_err = true
95 }
96
sslobodr392ebd52019-01-18 12:41:49 -050097 // Load the protobuf descriptor file
98 dr.protoDescriptor = &pb.FileDescriptorSet{}
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040099 fb, err := ioutil.ReadFile(config.ProtoFile)
sslobodr392ebd52019-01-18 12:41:49 -0500100 if err != nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400101 log.Errorf("Could not open proto file '%s'", config.ProtoFile)
sslobodr392ebd52019-01-18 12:41:49 -0500102 rtrn_err = true
103 }
104 err = proto.Unmarshal(fb, dr.protoDescriptor)
105 if err != nil {
106 log.Errorf("Could not unmarshal %s, %v", "proto.pb", err)
107 rtrn_err = true
108 }
109
sslobodr392ebd52019-01-18 12:41:49 -0500110 // Build the routing structure based on the loaded protobuf
111 // descriptor file and the config information.
112 type key struct {
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400113 method string
114 field string
sslobodr392ebd52019-01-18 12:41:49 -0500115 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400116 var msgs = make(map[key]byte)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400117 for _, f := range dr.protoDescriptor.File {
sslobodr392ebd52019-01-18 12:41:49 -0500118 // Build a temporary map of message types by name.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400119 for _, m := range f.MessageType {
120 for _, fld := range m.Field {
sslobodr392ebd52019-01-18 12:41:49 -0500121 log.Debugf("Processing message '%s', field '%s'", *m.Name, *fld.Name)
122 msgs[key{*m.Name, *fld.Name}] = byte(*fld.Number)
123 }
124 }
125 }
126 log.Debugf("The map contains: %v", msgs)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400127 for _, f := range dr.protoDescriptor.File {
sslobodr392ebd52019-01-18 12:41:49 -0500128 if *f.Package == rconf.ProtoPackage {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400129 for _, s := range f.Service {
sslobodr392ebd52019-01-18 12:41:49 -0500130 if *s.Name == rconf.ProtoService {
131 log.Debugf("Loading package data '%s' for service '%s' for router '%s'", *f.Package, *s.Name, dr.name)
132 // Now create a map keyed by method name with the value being the
133 // field number of the route selector.
134 var ok bool
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400135 for _, m := range s.Method {
sslobodr392ebd52019-01-18 12:41:49 -0500136 // Find the input type in the messages and extract the
137 // field number and save it for future reference.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400138 log.Debugf("Processing method '%s'", *m.Name)
sslobodr392ebd52019-01-18 12:41:49 -0500139 // Determine if this is a method we're supposed to be processing.
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400140 if needMethod(*m.Name, config) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400141 log.Debugf("Enabling method '%s'", *m.Name)
sslobodr1d1e50b2019-03-14 09:17:40 -0400142 pkg_methd := pkg_re.FindStringSubmatch(*m.InputType)
143 if pkg_methd == nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400144 log.Errorf("Regular expression didn't match input type '%s'", *m.InputType)
sslobodr1d1e50b2019-03-14 09:17:40 -0400145 rtrn_err = true
146 }
sslobodr392ebd52019-01-18 12:41:49 -0500147 // The input type has the package name prepended to it. Remove it.
sslobodr1d1e50b2019-03-14 09:17:40 -0400148 //in := (*m.InputType)[len(rconf.ProtoPackage)+2:]
149 in := pkg_methd[PKG_MTHD_MTHD]
sslobodr392ebd52019-01-18 12:41:49 -0500150 dr.methodMap[*m.Name], ok = msgs[key{in, config.RouteField}]
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400151 if !ok {
sslobodr392ebd52019-01-18 12:41:49 -0500152 log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400153 *m.Name, config.RouteField, in)
sslobodr392ebd52019-01-18 12:41:49 -0500154 rtrn_err = true
155 }
156 }
157 // The sb method is always included in the methods so we can check it here too.
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400158 if needSbMethod(*m.Name, config) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400159 log.Debugf("Enabling southbound method '%s'", *m.Name)
sslobodr392ebd52019-01-18 12:41:49 -0500160 // The output type has the package name prepended to it. Remove it.
161 out := (*m.OutputType)[len(rconf.ProtoPackage)+2:]
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400162 dr.nbBindingMethodMap[*m.Name], ok = msgs[key{out, config.RouteField}]
163 if !ok {
sslobodr392ebd52019-01-18 12:41:49 -0500164 log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400165 *m.Name, config.RouteField, out)
sslobodr392ebd52019-01-18 12:41:49 -0500166 rtrn_err = true
167 }
168 }
169 }
170 }
171 }
172 }
173 }
174
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400175 // Create the backend cluster or link to an existing one
sslobodr392ebd52019-01-18 12:41:49 -0500176 ok := true
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400177 if dr.cluster, ok = clusters[config.backendCluster.Name]; !ok {
178 if dr.cluster, err = newBackendCluster(config.backendCluster); err != nil {
sslobodr392ebd52019-01-18 12:41:49 -0500179 log.Errorf("Could not create a backend for router %s", config.Name)
180 rtrn_err = true
181 }
182 }
183
184 if rtrn_err {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400185 return dr, errors.New(fmt.Sprintf("Failed to create a new router '%s'", dr.name))
sslobodr392ebd52019-01-18 12:41:49 -0500186 }
187
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400188 return dr, nil
sslobodr392ebd52019-01-18 12:41:49 -0500189}
190
191func needSbMethod(mthd string, conf *RouteConfig) bool {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400192 for _, m := range conf.NbBindingMethods {
sslobodr392ebd52019-01-18 12:41:49 -0500193 if mthd == m {
194 return true
195 }
196 }
197 return false
198}
199
200func needMethod(mthd string, conf *RouteConfig) bool {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400201 for _, m := range conf.Methods {
sslobodr392ebd52019-01-18 12:41:49 -0500202 if mthd == m {
203 return true
204 }
205 }
206 return false
207}
208
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400209func (ar AffinityRouter) Service() string {
210 return ar.grpcService
sslobodr392ebd52019-01-18 12:41:49 -0500211}
212
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400213func (ar AffinityRouter) Name() string {
214 return ar.name
sslobodr392ebd52019-01-18 12:41:49 -0500215}
216
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400217func (ar AffinityRouter) skipField(data *[]byte, idx *int) error {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400218 switch (*data)[*idx] & 3 {
219 case 0: // Varint
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400220 *idx++
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400221 for (*data)[*idx] >= 128 {
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400222 *idx++
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400223 }
224 case 1: // 64 bit
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400225 *idx += 9
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400226 case 2: // Length delimited
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400227 *idx++
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400228 b := proto.NewBuffer((*data)[*idx:])
229 t, _ := b.DecodeVarint()
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400230 *idx += int(t) + 1
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400231 case 3: // Deprecated
232 case 4: // Deprecated
233 case 5: // 32 bit
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400234 *idx += 5
sslobodr392ebd52019-01-18 12:41:49 -0500235 }
236 return nil
237}
238
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400239func (ar AffinityRouter) decodeProtoField(payload []byte, fieldId byte) (string, error) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400240 idx := 0
sslobodr392ebd52019-01-18 12:41:49 -0500241 b := proto.NewBuffer([]byte{})
sslobodr1d1e50b2019-03-14 09:17:40 -0400242 //b.DebugPrint("The Buffer", payload)
sslobodr392ebd52019-01-18 12:41:49 -0500243 for { // Find the route selector field
244 log.Debugf("Decoding afinity value attributeNumber: %d from %v at index %d", fieldId, payload, idx)
245 log.Debugf("Attempting match with payload: %d, methodTable: %d", payload[idx], fieldId)
246 if payload[idx]>>3 == fieldId {
247 log.Debugf("Method match with payload: %d, methodTable: %d", payload[idx], fieldId)
248 // TODO: Consider supporting other selector types.... Way, way in the future
249 // ok, the future is now, support strings as well... ugh.
250 var selector string
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400251 switch payload[idx] & 3 {
252 case 0: // Integer
253 b.SetBuf(payload[idx+1:])
254 v, e := b.DecodeVarint()
255 if e == nil {
256 log.Debugf("Decoded the ing field: %v", v)
257 selector = strconv.Itoa(int(v))
258 } else {
259 log.Errorf("Failed to decode varint %v", e)
260 return "", e
261 }
262 case 2: // Length delimited AKA string
263 b.SetBuf(payload[idx+1:])
264 v, e := b.DecodeStringBytes()
265 if e == nil {
266 log.Debugf("Decoded the string field: %v", v)
267 selector = v
268 } else {
269 log.Errorf("Failed to decode string %v", e)
270 return "", e
271 }
272 default:
273 err := errors.New(fmt.Sprintf("Only integer and string route selectors are permitted"))
274 log.Error(err)
275 return "", err
sslobodr392ebd52019-01-18 12:41:49 -0500276 }
277 return selector, nil
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400278 } else if err := ar.skipField(&payload, &idx); err != nil {
sslobodr392ebd52019-01-18 12:41:49 -0500279 log.Errorf("Parsing message failed %v", err)
280 return "", err
281 }
282 }
283}
284
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400285func (ar AffinityRouter) Route(sel interface{}) *backend {
sslobodr392ebd52019-01-18 12:41:49 -0500286 switch sl := sel.(type) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400287 case *nbFrame:
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400288 log.Debugf("Route called for nbFrame with method %s", sl.methodInfo.method)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400289 // Check if this method should be affinity bound from the
290 // reply rather than the request.
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400291 if _, ok := ar.nbBindingMethodMap[sl.methodInfo.method]; ok {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400292 var err error
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400293 log.Debugf("Method '%s' affinity binds on reply", sl.methodInfo.method)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400294 // Just round robin route the southbound request
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400295 if *ar.currentBackend, err = ar.cluster.nextBackend(*ar.currentBackend, BackendSequenceRoundRobin); err == nil {
296 return *ar.currentBackend
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400297 } else {
298 sl.err = err
299 return nil
300 }
301 }
302 // Not a south affinity binding method, proceed with north affinity binding.
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400303 if selector, err := ar.decodeProtoField(sl.payload, ar.methodMap[sl.methodInfo.method]); err == nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400304 log.Debugf("Establishing affinity for selector: %s", selector)
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400305 if rtrn, ok := ar.affinity[selector]; ok {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400306 return rtrn
307 } else {
308 // The selector isn't in the map, create a new affinity mapping
309 log.Debugf("MUST CREATE A NEW AFFINITY MAP ENTRY!!")
sslobodr392ebd52019-01-18 12:41:49 -0500310 var err error
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400311 if *ar.currentBackend, err = ar.cluster.nextBackend(*ar.currentBackend, BackendSequenceRoundRobin); err == nil {
312 ar.setAffinity(selector, *ar.currentBackend)
313 //ar.affinity[selector] = *ar.currentBackend
314 //log.Debugf("New affinity set to backend %s",(*ar.currentBackend).name)
315 return *ar.currentBackend
sslobodr392ebd52019-01-18 12:41:49 -0500316 } else {
317 sl.err = err
318 return nil
319 }
320 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400321 }
322 default:
323 log.Errorf("Internal: invalid data type in Route call %v", sel)
324 return nil
sslobodr392ebd52019-01-18 12:41:49 -0500325 }
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400326 log.Errorf("Bad lookup in affinity map %v", ar.affinity)
sslobodr392ebd52019-01-18 12:41:49 -0500327 return nil
328}
329
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400330func (ar AffinityRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
331 return "", "", nil
sslobodr392ebd52019-01-18 12:41:49 -0500332}
333
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400334func (ar AffinityRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
335 return ar.cluster, nil
sslobodr392ebd52019-01-18 12:41:49 -0500336}
337
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400338func (ar AffinityRouter) FindBackendCluster(beName string) *cluster {
339 if beName == ar.cluster.name {
340 return ar.cluster
sslobodr392ebd52019-01-18 12:41:49 -0500341 }
342 return nil
343}
344
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400345func (ar AffinityRouter) ReplyHandler(sel interface{}) error {
sslobodr392ebd52019-01-18 12:41:49 -0500346 switch sl := sel.(type) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400347 case *sbFrame:
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400348 sl.mutex.Lock()
349 defer sl.mutex.Unlock()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400350 log.Debugf("Reply handler called for sbFrame with method %s", sl.method)
351 // Determine if reply action is required.
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400352 if fld, ok := ar.nbBindingMethodMap[sl.method]; ok && len(sl.payload) > 0 {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400353 // Extract the field value from the frame and
354 // and set affinity accordingly
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400355 if selector, err := ar.decodeProtoField(sl.payload, fld); err == nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400356 log.Debug("Settign affinity on reply")
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400357 if ar.setAffinity(selector, sl.backend) != nil {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400358 log.Error("Setting affinity on reply failed")
sslobodr392ebd52019-01-18 12:41:49 -0500359 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400360 return nil
361 } else {
362 err := errors.New(fmt.Sprintf("Failed to decode reply field %d for method %s", fld, sl.method))
363 log.Error(err)
364 return err
sslobodr392ebd52019-01-18 12:41:49 -0500365 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400366 }
367 return nil
368 default:
369 err := errors.New(fmt.Sprintf("Internal: invalid data type in ReplyHander call %v", sl))
370 log.Error(err)
371 return err
sslobodr392ebd52019-01-18 12:41:49 -0500372 }
373}
374
375func (ar AffinityRouter) setAffinity(key string, be *backend) error {
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400376 if be2, ok := ar.affinity[key]; !ok {
sslobodr392ebd52019-01-18 12:41:49 -0500377 ar.affinity[key] = be
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400378 log.Debugf("New affinity set to backend %s for key %s", be.name, key)
sslobodr392ebd52019-01-18 12:41:49 -0500379 } else if be2 != be {
380 err := errors.New(fmt.Sprintf("Attempting multiple sets of affinity for key %s to backend %s from %s on router %s",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400381 key, be.name, ar.affinity[key].name, ar.name))
sslobodr392ebd52019-01-18 12:41:49 -0500382 log.Error(err)
383 return err
384 }
385 return nil
386}