blob: 859d92f1298d46401908429bc234df6d5a9e765a [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouter
18
19import (
20 "errors"
21 "fmt"
22 "github.com/google/uuid"
23 "github.com/opencord/voltha-go/common/log"
24 "google.golang.org/grpc"
25)
26
27var clusters = make(map[string]*cluster)
28
29// cluster a collection of HA backends
30type cluster struct {
31 name string
32 //backends map[string]*backend
33 backends []*backend
34 backendIDMap map[*backend]int
35}
36
37//TODO: Move the backend type (active/active etc) to the cluster
38// level. All backends should really be of the same type.
39// Create a new backend cluster
40func newBackendCluster(conf *BackendClusterConfig) (*cluster, error) {
41 var err error = nil
42 var rtrn_err = false
43 var be *backend
44 log.Debugf("Creating a backend cluster with %v", conf)
45 // Validate the configuration
46 if conf.Name == "" {
47 log.Error("A backend cluster must have a name")
48 rtrn_err = true
49 }
50 //bc := &cluster{name:conf.Name,backends:make(map[string]*backend)}
51 bc := &cluster{name: conf.Name, backendIDMap: make(map[*backend]int)}
52 clusters[bc.name] = bc
53 idx := 0
54 for _, bec := range conf.Backends {
55 if bec.Name == "" {
56 log.Errorf("A backend must have a name in cluster %s\n", conf.Name)
57 rtrn_err = true
58 }
59 if be, err = newBackend(&bec, conf.Name); err != nil {
60 log.Errorf("Error creating backend %s", bec.Name)
61 rtrn_err = true
62 }
63 bc.backends = append(bc.backends, be)
64 bc.backendIDMap[bc.backends[idx]] = idx
65 idx++
66 }
67 if rtrn_err {
68 return nil, errors.New("Error creating backend(s)")
69 }
70 return bc, nil
71}
72
73func (c *cluster) getBackend(name string) *backend {
74 for _, v := range c.backends {
75 if v.name == name {
76 return v
77 }
78 }
79 return nil
80}
81
82func (c *cluster) allocateSerialNumber() string {
83 return uuid.New().String()
84}
85
86func (c *cluster) nextBackend(be *backend, seq backendSequence) (*backend, error) {
87 switch seq {
88 case BackendSequenceRoundRobin: // Round robin
89 in := be
90 // If no backend is found having a connection
91 // then return nil.
92 if be == nil {
93 log.Debug("Previous backend is nil")
94 be = c.backends[0]
95 in = be
96 if be.NumOpenConnections() != 0 {
97 return be, nil
98 }
99 }
100 for {
101 log.Debugf("Requesting a new backend starting from %s", be.name)
102 cur := c.backendIDMap[be]
103 cur++
104 if cur >= len(c.backends) {
105 cur = 0
106 }
107 log.Debugf("Next backend is %d:%s", cur, c.backends[cur].name)
108 if c.backends[cur].NumOpenConnections() > 0 {
109 return c.backends[cur], nil
110 }
111 if c.backends[cur] == in {
112 err := fmt.Errorf("No backend with open connections found")
113 log.Debug(err)
114 return nil, err
115 }
116 be = c.backends[cur]
117 log.Debugf("Backend '%s' has no open connections, trying next", c.backends[cur].name)
118 }
119 default: // Invalid, default to round robin
120 log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq)
121 return c.nextBackend(be, BackendSequenceRoundRobin)
122 }
123}
124
125func (c *cluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, methodInfo methodDetails,
126 mk string, mv string) error {
127 //func (c *cluster) handler(nbR * nbRequest) error {
128
129 // The final backend cluster needs to be determined here. With non-affinity routed backends it could
130 // just be determined here and for affinity routed backends the first message must be received
131 // before the backend is determined. In order to keep things simple, the same approach is taken for
132 // now.
133
134 // Get the backend to use.
135 // Allocate the requestFrame here since it holds the "context" of this communication
136 nf := &requestFrame{router: r, methodInfo: methodInfo, serialNo: c.allocateSerialNumber(), metaKey: mk, metaVal: mv}
137 log.Debugf("Nb frame allocate with method %s", nf.methodInfo.method)
138
139 if be, err := c.assignBackend(serverStream, nf); err != nil {
140 // At this point, no backend streams have been initiated
141 // so just return the error.
142 return err
143 } else {
144 log.Debugf("Backend '%s' selected", be.name)
145 // Allocate a responseFrame here because it might be needed for return value intercept
146 sf := &responseFrame{router: r, backend: be, method: nf.methodInfo.method, metaKey: mk, metaVal: mv}
147 log.Debugf("Sb frame allocated with router %s", r.Name())
148 return be.handler(srv, serverStream, nf, sf)
149 }
150}
151
152func (c *cluster) assignBackend(src grpc.ServerStream, f *requestFrame) (*backend, error) {
153 // Receive the first message from the server. This calls the assigned codec in which
154 // Unmarshal gets executed. That will use the assigned router to select a backend
155 // and add it to the frame
156 if err := src.RecvMsg(f); err != nil {
157 return nil, err
158 }
159 // Check that the backend was routable and actually has connections open.
160 // If it doesn't then return a nil backend to indicate this
161 if f.backend == nil {
162 err := fmt.Errorf("Unable to route method '%s'", f.methodInfo.method)
163 log.Error(err)
164 return nil, err
165 } else if len(f.backend.openConns) == 0 {
166 err := fmt.Errorf("No open connections on backend '%s'", f.backend.name)
167 log.Error(err)
168 return f.backend, err
169 }
170 log.Debugf("Assigned backend %s", f.backend.name)
171 return f.backend, nil
172}