| /* |
| * Copyright 2019-present Open Networking Foundation |
| |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package afrouter |
| |
| import ( |
| "context" |
| "github.com/opencord/voltha-go/common/log" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/connectivity" |
| "sync" |
| "time" |
| ) |
| |
| // connection represents a connection to a single backend |
| type connection struct { |
| mutex sync.Mutex |
| name string |
| addr string |
| port string |
| gConn *gConnection |
| backend *backend |
| } |
| |
| // This structure should never be referred to |
| // by any routine outside of *connection |
| // routines. |
| type gConnection struct { |
| mutex sync.Mutex |
| state connectivity.State |
| conn *grpc.ClientConn |
| cancel context.CancelFunc |
| } |
| |
| func (cn *connection) connect() { |
| if cn.addr != "" && cn.getConn() == nil { |
| log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name, cn.addr, cn.port) |
| // Dial doesn't block, it just returns and continues connecting in the background. |
| // Check back later to confirm and increase the connection count. |
| ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection |
| cn.setCancel(cnclFnc) |
| if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil { |
| log.Errorf("Dialng connection %v:%v", cn, err) |
| cn.waitAndTryAgain(ctx) |
| } else { |
| cn.setConn(conn) |
| log.Debugf("Starting the connection monitor for '%s'", cn.name) |
| cn.monitor(ctx) |
| } |
| } else if cn.addr == "" { |
| log.Infof("No address supplied for connection '%s', not connecting for now", cn.name) |
| } else { |
| log.Debugf("Connection '%s' is already connected, ignoring", cn.name) |
| } |
| } |
| |
| func (cn *connection) waitAndTryAgain(ctx context.Context) { |
| go func(ctx context.Context) { |
| ctxTm, cnclTm := context.WithTimeout(context.Background(), 10*time.Second) |
| select { |
| case <-ctxTm.Done(): |
| cnclTm() |
| log.Debugf("Trying to connect '%s'", cn.name) |
| // Connect creates a new context so cancel this one. |
| cn.cancel() |
| cn.connect() |
| return |
| case <-ctx.Done(): |
| cnclTm() |
| return |
| } |
| }(ctx) |
| } |
| |
| func (cn *connection) cancel() { |
| cn.mutex.Lock() |
| defer cn.mutex.Unlock() |
| log.Debugf("Canceling connection %s", cn.name) |
| if cn.gConn != nil { |
| if cn.gConn.cancel != nil { |
| cn.gConn.cancel() |
| } else { |
| log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name) |
| } |
| } else { |
| log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name) |
| } |
| } |
| |
| func (cn *connection) setCancel(cancel context.CancelFunc) { |
| cn.mutex.Lock() |
| defer cn.mutex.Unlock() |
| if cn.gConn != nil { |
| cn.gConn.cancel = cancel |
| } else { |
| log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name) |
| } |
| } |
| |
| func (cn *connection) setConn(conn *grpc.ClientConn) { |
| cn.mutex.Lock() |
| defer cn.mutex.Unlock() |
| if cn.gConn != nil { |
| cn.gConn.conn = conn |
| } else { |
| log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name) |
| } |
| } |
| |
| func (cn *connection) getConn() *grpc.ClientConn { |
| cn.mutex.Lock() |
| defer cn.mutex.Unlock() |
| if cn.gConn != nil { |
| return cn.gConn.conn |
| } |
| return nil |
| } |
| |
| func (cn *connection) close() { |
| cn.mutex.Lock() |
| defer cn.mutex.Unlock() |
| log.Debugf("Closing connection %s", cn.name) |
| if cn.gConn != nil && cn.gConn.conn != nil { |
| if cn.gConn.conn.GetState() == connectivity.Ready { |
| cn.backend.decConn() // Decrease the connection reference |
| } |
| if cn.gConn.cancel != nil { |
| cn.gConn.cancel() // Cancel the context first to force monitor functions to exit |
| } else { |
| log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name) |
| } |
| cn.gConn.conn.Close() // Close the connection |
| // Now replace the gConn object with a new one as this one just |
| // fades away as references to it are released after the close |
| // finishes in the background. |
| cn.gConn = &gConnection{conn: nil, cancel: nil, state: connectivity.TransientFailure} |
| } else { |
| log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name) |
| } |
| |
| } |
| |
| func (cn *connection) setState(st connectivity.State) { |
| cn.mutex.Lock() |
| defer cn.mutex.Unlock() |
| if cn.gConn != nil { |
| cn.gConn.state = st |
| } else { |
| log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name) |
| } |
| } |
| |
| func (cn *connection) getState() connectivity.State { |
| cn.mutex.Lock() |
| defer cn.mutex.Unlock() |
| if cn.gConn != nil { |
| if cn.gConn.conn != nil { |
| return cn.gConn.conn.GetState() |
| } else { |
| log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name) |
| } |
| } else { |
| log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name) |
| } |
| // For lack of a better state to use. The logs will help determine what happened here. |
| return connectivity.TransientFailure |
| } |
| |
| func (cn *connection) monitor(ctx context.Context) { |
| be := cn.backend |
| log.Debugf("Setting up monitoring for backend %s", be.name) |
| go func(ctx context.Context) { |
| var delay time.Duration = 100 //ms |
| for { |
| //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, be.name, cn.conn) |
| if cn.getState() == connectivity.Ready { |
| log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, be.name) |
| cn.setState(connectivity.Ready) |
| be.incConn() |
| if cn.getConn() != nil && !cn.getConn().WaitForStateChange(ctx, connectivity.Ready) { |
| // The context was canceled. This is done by the close function |
| // so just exit the routine |
| log.Debugf("Contxt canceled for connection '%s' on backend '%s'", cn.name, be.name) |
| return |
| } |
| if cs := cn.getConn(); cs != nil { |
| switch cs := cn.getState(); cs { |
| case connectivity.TransientFailure: |
| cn.setState(cs) |
| be.decConn() |
| log.Infof("Transient failure for connection '%s' on backend '%s'", cn.name, be.name) |
| delay = 100 |
| case connectivity.Shutdown: |
| //The connection was closed. The assumption here is that the closer |
| // will manage the connection count and setting the conn to nil. |
| // Exit the routine |
| log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, be.name) |
| return |
| case connectivity.Idle: |
| // This can only happen if the server sends a GoAway. This can |
| // only happen if the server has modified MaxConnectionIdle from |
| // its default of infinity. The only solution here is to close the |
| // connection and keepTrying()? |
| //TODO: Read the grpc source code to see if there's a different approach |
| log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, be.name) |
| cn.close() |
| cn.connect() |
| return |
| } |
| } else { // A nil means something went horribly wrong, error and exit. |
| log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s", cn.name) |
| return |
| } |
| } else { |
| log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, be.name) |
| ctxTm, cnclTm := context.WithTimeout(context.Background(), delay*time.Millisecond) |
| if delay < 30000 { |
| delay += delay |
| } |
| select { |
| case <-ctxTm.Done(): |
| cnclTm() // Doubt this is required but it's harmless. |
| // Do nothing but let the loop continue |
| case <-ctx.Done(): |
| cnclTm() |
| // Context was closed, close and exit routine |
| //cn.close() NO! let the close be managed externally! |
| return |
| } |
| } |
| } |
| }(ctx) |
| } |