blob: 879246b40a35cd80a92837c78c6704cd8d0fcfe5 [file] [log] [blame]
Kent Hagerman1e9061e2019-05-21 16:01:21 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouter
18
19import (
20 "errors"
21 "fmt"
22 "github.com/opencord/voltha-go/common/log"
23 "google.golang.org/grpc"
24 "sync/atomic"
25)
26
27var clusters = make(map[string]*cluster)
28
29// cluster a collection of HA backends
30type cluster struct {
31 name string
32 //backends map[string]*backend
33 backends []*backend
34 backendIDMap map[*backend]int
35 serialNoCounter uint64
36}
37
38//TODO: Move the backend type (active/active etc) to the cluster
39// level. All backends should really be of the same type.
40// Create a new backend cluster
41func newBackendCluster(conf *BackendClusterConfig) (*cluster, error) {
42 var err error = nil
43 var rtrn_err = false
44 var be *backend
45 log.Debugf("Creating a backend cluster with %v", conf)
46 // Validate the configuration
47 if conf.Name == "" {
48 log.Error("A backend cluster must have a name")
49 rtrn_err = true
50 }
51 //bc := &cluster{name:conf.Name,backends:make(map[string]*backend)}
52 bc := &cluster{name: conf.Name, backendIDMap: make(map[*backend]int)}
53 clusters[bc.name] = bc
54 idx := 0
55 for _, bec := range conf.Backends {
56 if bec.Name == "" {
57 log.Errorf("A backend must have a name in cluster %s\n", conf.Name)
58 rtrn_err = true
59 }
60 if be, err = newBackend(&bec, conf.Name); err != nil {
61 log.Errorf("Error creating backend %s", bec.Name)
62 rtrn_err = true
63 }
64 bc.backends = append(bc.backends, be)
65 bc.backendIDMap[bc.backends[idx]] = idx
66 idx++
67 }
68 if rtrn_err {
69 return nil, errors.New("Error creating backend(s)")
70 }
71 return bc, nil
72}
73
74func (c *cluster) getBackend(name string) *backend {
75 for _, v := range c.backends {
76 if v.name == name {
77 return v
78 }
79 }
80 return nil
81}
82
83func (c *cluster) allocateSerialNumber() uint64 {
84 return atomic.AddUint64(&c.serialNoCounter, 1) - 1
85}
86
87func (c *cluster) nextBackend(be *backend, seq backendSequence) (*backend, error) {
88 switch seq {
89 case BackendSequenceRoundRobin: // Round robin
90 in := be
91 // If no backend is found having a connection
92 // then return nil.
93 if be == nil {
94 log.Debug("Previous backend is nil")
95 be = c.backends[0]
96 in = be
97 if be.openConns != 0 {
98 return be, nil
99 }
100 }
101 for {
102 log.Debugf("Requesting a new backend starting from %s", be.name)
103 cur := c.backendIDMap[be]
104 cur++
105 if cur >= len(c.backends) {
106 cur = 0
107 }
108 log.Debugf("Next backend is %d:%s", cur, c.backends[cur].name)
109 if c.backends[cur].openConns > 0 {
110 return c.backends[cur], nil
111 }
112 if c.backends[cur] == in {
113 err := fmt.Errorf("No backend with open connections found")
114 log.Debug(err)
115 return nil, err
116 }
117 be = c.backends[cur]
118 log.Debugf("Backend '%s' has no open connections, trying next", c.backends[cur].name)
119 }
120 default: // Invalid, default to round robin
121 log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq)
122 return c.nextBackend(be, BackendSequenceRoundRobin)
123 }
124}
125
126func (c *cluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, methodInfo methodDetails,
127 mk string, mv string) error {
128 //func (c *cluster) handler(nbR * nbRequest) error {
129
130 // The final backend cluster needs to be determined here. With non-affinity routed backends it could
131 // just be determined here and for affinity routed backends the first message must be received
132 // before the backend is determined. In order to keep things simple, the same approach is taken for
133 // now.
134
135 // Get the backend to use.
136 // Allocate the nbFrame here since it holds the "context" of this communication
137 nf := &nbFrame{router: r, methodInfo: methodInfo, serialNo: c.allocateSerialNumber(), metaKey: mk, metaVal: mv}
138 log.Debugf("Nb frame allocate with method %s", nf.methodInfo.method)
139
140 if be, err := c.assignBackend(serverStream, nf); err != nil {
141 // At this point, no backend streams have been initiated
142 // so just return the error.
143 return err
144 } else {
145 log.Debugf("Backend '%s' selected", be.name)
146 // Allocate a sbFrame here because it might be needed for return value intercept
147 sf := &sbFrame{router: r, backend: be, method: nf.methodInfo.method, metaKey: mk, metaVal: mv}
148 log.Debugf("Sb frame allocated with router %s", r.Name())
149 return be.handler(srv, serverStream, nf, sf)
150 }
151}
152
153func (c *cluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) {
154 // Receive the first message from the server. This calls the assigned codec in which
155 // Unmarshal gets executed. That will use the assigned router to select a backend
156 // and add it to the frame
157 if err := src.RecvMsg(f); err != nil {
158 return nil, err
159 }
160 // Check that the backend was routable and actually has connections open.
161 // If it doesn't then return a nil backend to indicate this
162 if f.backend == nil {
163 err := fmt.Errorf("Unable to route method '%s'", f.methodInfo.method)
164 log.Error(err)
165 return nil, err
166 } else if f.backend.openConns == 0 {
167 err := fmt.Errorf("No open connections on backend '%s'", f.backend.name)
168 log.Error(err)
169 return f.backend, err
170 }
171 return f.backend, nil
172}