blob: fab1052ae66cb19112148c676a108332a85d4762 [file] [log] [blame]
Kent Hagerman1e9061e2019-05-21 16:01:21 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouter
18
19import (
20 "context"
21 "github.com/opencord/voltha-go/common/log"
22 "google.golang.org/grpc"
23 "google.golang.org/grpc/connectivity"
24 "sync"
25 "time"
26)
27
28// connection represents a connection to a single backend
29type connection struct {
30 mutex sync.Mutex
31 name string
32 addr string
33 port string
34 gConn *gConnection
35 backend *backend
36}
37
38// This structure should never be referred to
39// by any routine outside of *connection
40// routines.
41type gConnection struct {
42 mutex sync.Mutex
43 state connectivity.State
44 conn *grpc.ClientConn
45 cancel context.CancelFunc
46}
47
48func (cn *connection) connect() {
49 if cn.addr != "" && cn.getConn() == nil {
50 log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name, cn.addr, cn.port)
51 // Dial doesn't block, it just returns and continues connecting in the background.
52 // Check back later to confirm and increase the connection count.
53 ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection
54 cn.setCancel(cnclFnc)
55 if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil {
56 log.Errorf("Dialng connection %v:%v", cn, err)
57 cn.waitAndTryAgain(ctx)
58 } else {
59 cn.setConn(conn)
60 log.Debugf("Starting the connection monitor for '%s'", cn.name)
61 cn.monitor(ctx)
62 }
63 } else if cn.addr == "" {
64 log.Infof("No address supplied for connection '%s', not connecting for now", cn.name)
65 } else {
66 log.Debugf("Connection '%s' is already connected, ignoring", cn.name)
67 }
68}
69
70func (cn *connection) waitAndTryAgain(ctx context.Context) {
71 go func(ctx context.Context) {
72 ctxTm, cnclTm := context.WithTimeout(context.Background(), 10*time.Second)
73 select {
74 case <-ctxTm.Done():
75 cnclTm()
76 log.Debugf("Trying to connect '%s'", cn.name)
77 // Connect creates a new context so cancel this one.
78 cn.cancel()
79 cn.connect()
80 return
81 case <-ctx.Done():
82 cnclTm()
83 return
84 }
85 }(ctx)
86}
87
88func (cn *connection) cancel() {
89 cn.mutex.Lock()
90 defer cn.mutex.Unlock()
91 log.Debugf("Canceling connection %s", cn.name)
92 if cn.gConn != nil {
93 if cn.gConn.cancel != nil {
94 cn.gConn.cancel()
95 } else {
96 log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
97 }
98 } else {
99 log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name)
100 }
101}
102
103func (cn *connection) setCancel(cancel context.CancelFunc) {
104 cn.mutex.Lock()
105 defer cn.mutex.Unlock()
106 if cn.gConn != nil {
107 cn.gConn.cancel = cancel
108 } else {
109 log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name)
110 }
111}
112
113func (cn *connection) setConn(conn *grpc.ClientConn) {
114 cn.mutex.Lock()
115 defer cn.mutex.Unlock()
116 if cn.gConn != nil {
117 cn.gConn.conn = conn
118 } else {
119 log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name)
120 }
121}
122
123func (cn *connection) getConn() *grpc.ClientConn {
124 cn.mutex.Lock()
125 defer cn.mutex.Unlock()
126 if cn.gConn != nil {
127 return cn.gConn.conn
128 }
129 return nil
130}
131
132func (cn *connection) close() {
133 cn.mutex.Lock()
134 defer cn.mutex.Unlock()
135 log.Debugf("Closing connection %s", cn.name)
136 if cn.gConn != nil && cn.gConn.conn != nil {
137 if cn.gConn.conn.GetState() == connectivity.Ready {
138 cn.backend.decConn() // Decrease the connection reference
139 }
140 if cn.gConn.cancel != nil {
141 cn.gConn.cancel() // Cancel the context first to force monitor functions to exit
142 } else {
143 log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
144 }
145 cn.gConn.conn.Close() // Close the connection
146 // Now replace the gConn object with a new one as this one just
147 // fades away as references to it are released after the close
148 // finishes in the background.
149 cn.gConn = &gConnection{conn: nil, cancel: nil, state: connectivity.TransientFailure}
150 } else {
151 log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name)
152 }
153
154}
155
156func (cn *connection) setState(st connectivity.State) {
157 cn.mutex.Lock()
158 defer cn.mutex.Unlock()
159 if cn.gConn != nil {
160 cn.gConn.state = st
161 } else {
162 log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name)
163 }
164}
165
166func (cn *connection) getState() connectivity.State {
167 cn.mutex.Lock()
168 defer cn.mutex.Unlock()
169 if cn.gConn != nil {
170 if cn.gConn.conn != nil {
171 return cn.gConn.conn.GetState()
172 } else {
173 log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name)
174 }
175 } else {
176 log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name)
177 }
178 // For lack of a better state to use. The logs will help determine what happened here.
179 return connectivity.TransientFailure
180}
181
182func (cn *connection) monitor(ctx context.Context) {
183 be := cn.backend
184 log.Debugf("Setting up monitoring for backend %s", be.name)
185 go func(ctx context.Context) {
186 var delay time.Duration = 100 //ms
187 for {
188 //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, be.name, cn.conn)
189 if cn.getState() == connectivity.Ready {
190 log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, be.name)
191 cn.setState(connectivity.Ready)
192 be.incConn()
193 if cn.getConn() != nil && !cn.getConn().WaitForStateChange(ctx, connectivity.Ready) {
194 // The context was canceled. This is done by the close function
195 // so just exit the routine
196 log.Debugf("Contxt canceled for connection '%s' on backend '%s'", cn.name, be.name)
197 return
198 }
199 if cs := cn.getConn(); cs != nil {
200 switch cs := cn.getState(); cs {
201 case connectivity.TransientFailure:
202 cn.setState(cs)
203 be.decConn()
204 log.Infof("Transient failure for connection '%s' on backend '%s'", cn.name, be.name)
205 delay = 100
206 case connectivity.Shutdown:
207 //The connection was closed. The assumption here is that the closer
208 // will manage the connection count and setting the conn to nil.
209 // Exit the routine
210 log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, be.name)
211 return
212 case connectivity.Idle:
213 // This can only happen if the server sends a GoAway. This can
214 // only happen if the server has modified MaxConnectionIdle from
215 // its default of infinity. The only solution here is to close the
216 // connection and keepTrying()?
217 //TODO: Read the grpc source code to see if there's a different approach
218 log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, be.name)
219 cn.close()
220 cn.connect()
221 return
222 }
223 } else { // A nil means something went horribly wrong, error and exit.
224 log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s", cn.name)
225 return
226 }
227 } else {
228 log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, be.name)
229 ctxTm, cnclTm := context.WithTimeout(context.Background(), delay*time.Millisecond)
230 if delay < 30000 {
231 delay += delay
232 }
233 select {
234 case <-ctxTm.Done():
235 cnclTm() // Doubt this is required but it's harmless.
236 // Do nothing but let the loop continue
237 case <-ctx.Done():
238 cnclTm()
239 // Context was closed, close and exit routine
240 //cn.close() NO! let the close be managed externally!
241 return
242 }
243 }
244 }
245 }(ctx)
246}