Reworked connection to use a single thread for state management.
Also disabled the SetConnection API call.
Stream cleanup.
Removed Unnescessary threads, there is now one thread per connection (handling response stream forwarding), and the existing thread is used to forward the request stream.
Renamed 'streams' to 'request'.
Renamed 'nbFrame' to 'requestFrame'.
Renamed 'sbFrame' to 'responseFrame'.
Changed handling of streaming requests.
Incoming & Outgoing streams are split when a connection becomes ready.
Added playback of non-streaming requests/responses for newly opened streams.
Late stream catchup fix & streaming call detection.
Fixed an issue where old streams were not being caught up with what they missed.
Streaming requests & responses are now detected based on the proto definitions.
Changed where the proto file is specified in the afrouter config (see afrouter/arouter.json for an example).
Fixed mutex copy.
Also tweaked some log statements.
Fixed field tag lint error.
Change-Id: I6e14039c27519d8d2103065258ff4302bc881235
(cherry picked from commit 03b58999ad8ce39d1c61af5cc62bfdeccd04be3a)
diff --git a/afrouter/afrouter/affinity-router.go b/afrouter/afrouter/affinity-router.go
index 2390846..583d0a7 100644
--- a/afrouter/afrouter/affinity-router.go
+++ b/afrouter/afrouter/affinity-router.go
@@ -20,10 +20,8 @@
"errors"
"fmt"
"github.com/golang/protobuf/proto"
- pb "github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/opencord/voltha-go/common/log"
"google.golang.org/grpc"
- "io/ioutil"
"regexp"
"strconv"
)
@@ -38,7 +36,6 @@
association associationType
routingField string
grpcService string
- protoDescriptor *pb.FileDescriptorSet
methodMap map[string]byte
nbBindingMethodMap map[string]byte
cluster *cluster
@@ -94,37 +91,23 @@
rtrn_err = true
}
- // Load the protobuf descriptor file
- dr.protoDescriptor = &pb.FileDescriptorSet{}
- fb, err := ioutil.ReadFile(config.ProtoFile)
- if err != nil {
- log.Errorf("Could not open proto file '%s'", config.ProtoFile)
- rtrn_err = true
- }
- err = proto.Unmarshal(fb, dr.protoDescriptor)
- if err != nil {
- log.Errorf("Could not unmarshal %s, %v", "proto.pb", err)
- rtrn_err = true
- }
-
// Build the routing structure based on the loaded protobuf
// descriptor file and the config information.
type key struct {
method string
field string
}
- var msgs = make(map[key]byte)
- for _, f := range dr.protoDescriptor.File {
+ var fieldNumberLookup = make(map[key]byte)
+ for _, f := range rconf.protoDescriptor.File {
// Build a temporary map of message types by name.
for _, m := range f.MessageType {
for _, fld := range m.Field {
log.Debugf("Processing message '%s', field '%s'", *m.Name, *fld.Name)
- msgs[key{*m.Name, *fld.Name}] = byte(*fld.Number)
+ fieldNumberLookup[key{*m.Name, *fld.Name}] = byte(*fld.Number)
}
}
}
- log.Debugf("The map contains: %v", msgs)
- for _, f := range dr.protoDescriptor.File {
+ for _, f := range rconf.protoDescriptor.File {
if *f.Package == rconf.ProtoPackage {
for _, s := range f.Service {
if *s.Name == rconf.ProtoService {
@@ -147,7 +130,7 @@
// The input type has the package name prepended to it. Remove it.
//in := (*m.InputType)[len(rconf.ProtoPackage)+2:]
in := pkg_methd[PKG_MTHD_MTHD]
- dr.methodMap[*m.Name], ok = msgs[key{in, config.RouteField}]
+ dr.methodMap[*m.Name], ok = fieldNumberLookup[key{in, config.RouteField}]
if !ok {
log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
*m.Name, config.RouteField, in)
@@ -155,11 +138,11 @@
}
}
// The sb method is always included in the methods so we can check it here too.
- if needSbMethod(*m.Name, config) {
+ if needNbBindingMethod(*m.Name, config) {
log.Debugf("Enabling southbound method '%s'", *m.Name)
// The output type has the package name prepended to it. Remove it.
out := (*m.OutputType)[len(rconf.ProtoPackage)+2:]
- dr.nbBindingMethodMap[*m.Name], ok = msgs[key{out, config.RouteField}]
+ dr.nbBindingMethodMap[*m.Name], ok = fieldNumberLookup[key{out, config.RouteField}]
if !ok {
log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
*m.Name, config.RouteField, out)
@@ -188,7 +171,7 @@
return dr, nil
}
-func needSbMethod(mthd string, conf *RouteConfig) bool {
+func needNbBindingMethod(mthd string, conf *RouteConfig) bool {
for _, m := range conf.NbBindingMethods {
if mthd == m {
return true
@@ -284,8 +267,8 @@
func (ar AffinityRouter) Route(sel interface{}) *backend {
switch sl := sel.(type) {
- case *nbFrame:
- log.Debugf("Route called for nbFrame with method %s", sl.methodInfo.method)
+ case *requestFrame:
+ log.Debugf("Route called for requestFrame with method %s", sl.methodInfo.method)
// Check if this method should be affinity bound from the
// reply rather than the request.
if _, ok := ar.nbBindingMethodMap[sl.methodInfo.method]; ok {
@@ -331,6 +314,10 @@
return "", "", nil
}
+func (ar AffinityRouter) IsStreaming(_ string) (bool, bool) {
+ panic("not implemented")
+}
+
func (ar AffinityRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
return ar.cluster, nil
}
@@ -344,10 +331,8 @@
func (ar AffinityRouter) ReplyHandler(sel interface{}) error {
switch sl := sel.(type) {
- case *sbFrame:
- sl.mutex.Lock()
- defer sl.mutex.Unlock()
- log.Debugf("Reply handler called for sbFrame with method %s", sl.method)
+ case *responseFrame:
+ log.Debugf("Reply handler called for responseFrame with method %s", sl.method)
// Determine if reply action is required.
if fld, ok := ar.nbBindingMethodMap[sl.method]; ok && len(sl.payload) > 0 {
// Extract the field value from the frame and
diff --git a/afrouter/afrouter/api.go b/afrouter/afrouter/api.go
index 87f5573..3e47ef8 100644
--- a/afrouter/afrouter/api.go
+++ b/afrouter/afrouter/api.go
@@ -24,6 +24,7 @@
"golang.org/x/net/context"
"google.golang.org/grpc"
"net"
+ "net/url"
"runtime"
"strconv"
)
@@ -41,7 +42,7 @@
var rtrn_err bool
// Create a seperate server and listener for the API
// Validate the ip address if one is provided
- if ip := net.ParseIP(config.Addr); config.Addr != "" && ip == nil {
+ if _, err := url.Parse(config.Addr); err != nil {
log.Errorf("Invalid address '%s' provided for API server", config.Addr)
rtrn_err = true
}
@@ -118,17 +119,7 @@
}
func (aa *ArouterApi) updateConnection(in *pb.Conn, cn *connection, b *backend) error {
- sPort := strconv.FormatUint(in.Port, 10)
- // Check that the ip address and or port are different
- if in.Addr == cn.addr && sPort == cn.port {
- err := errors.New(fmt.Sprintf("Refusing to change connection '%s' to identical values", in.Connection))
- return err
- }
- cn.close()
- cn.addr = in.Addr
- cn.port = sPort
- cn.connect()
- return nil
+ return errors.New("updateConnection not implemented")
}
func (aa ArouterApi) SetAffinity(ctx context.Context, in *pb.Affinity) (*pb.Result, error) {
diff --git a/afrouter/afrouter/backend.go b/afrouter/afrouter/backend.go
index 31f431a..3c7815d 100644
--- a/afrouter/afrouter/backend.go
+++ b/afrouter/afrouter/backend.go
@@ -25,9 +25,7 @@
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/metadata"
- "io"
"net/url"
"strconv"
"strings"
@@ -42,7 +40,8 @@
activeAssociation association
connFailCallback func(string, *backend) bool
connections map[string]*connection
- openConns int
+ openConns map[*connection]*grpc.ClientConn
+ activeRequests map[*request]struct{}
}
type association struct {
@@ -52,138 +51,101 @@
key string
}
-func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f *nbFrame) (*streams, error) {
+// splitActiveStreamsUnsafe expects the caller to have already locked the backend mutex
+func (be *backend) splitActiveStreamsUnsafe(cn *connection, conn *grpc.ClientConn) {
+ if len(be.activeRequests) != 0 {
+ log.Debugf("Creating new streams for %d existing requests", len(be.activeRequests))
+ }
+ for r := range be.activeRequests {
+ r.mutex.Lock()
+ if _, have := r.streams[cn.name]; !have {
+ log.Debugf("Opening southbound stream for existing request '%s'", r.methodInfo.method)
+ if stream, err := grpc.NewClientStream(r.ctx, clientStreamDescForProxying, conn, r.methodInfo.all); err != nil {
+ log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
+ } else {
+ go r.catchupRequestStreamThenForwardResponseStream(cn.name, stream)
+ // new thread will unlock the request mutex
+ continue
+ }
+ }
+ r.mutex.Unlock()
+ }
+}
- rtrn := &streams{streams: make(map[string]*stream), activeStream: nil}
+// openSouthboundStreams sets up a connection to each southbound frame
+func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, nf *requestFrame, sf *responseFrame) (*request, error) {
+ be.mutex.Lock()
+ defer be.mutex.Unlock()
- log.Debugf("Opening southbound streams for method '%s'", f.methodInfo.method)
+ isStreamingRequest, isStreamingResponse := nf.router.IsStreaming(nf.methodInfo.method)
+
// Get the metadata from the incoming message on the server
md, ok := metadata.FromIncomingContext(serverStream.Context())
if !ok {
- return nil, errors.New("Could not get a server stream metadata")
+ return nil, errors.New("could not get a server stream metadata")
}
+ r := &request{
+ // Create an outgoing context that includes the incoming metadata and that will cancel if the server's context is canceled
+ ctx: metadata.AppendToOutgoingContext(metadata.NewOutgoingContext(serverStream.Context(), md.Copy()), "voltha_serial_number", strconv.FormatUint(nf.serialNo, 10)),
+
+ streams: make(map[string]grpc.ClientStream),
+ responseErrChan: make(chan error, 1),
+
+ backend: be,
+ serverStream: serverStream,
+ methodInfo: nf.methodInfo,
+ requestFrame: nf,
+ responseFrame: sf,
+ isStreamingRequest: isStreamingRequest,
+ isStreamingResponse: isStreamingResponse,
+ }
+
+ log.Debugf("Opening southbound request for method '%s'", nf.methodInfo.method)
+
// TODO: Need to check if this is an active/active backend cluster
// with a serial number in the header.
- log.Debugf("Serial number for transaction allocated: %d", f.serialNo)
+ log.Debugf("Serial number for transaction allocated: %d", nf.serialNo)
// If even one stream can be created then proceed. If none can be
- // created then report an error becase both the primary and redundant
- // connections are non-existant.
+ // created then report an error because both the primary and redundant
+ // connections are non-existent.
var atLeastOne = false
var errStr strings.Builder
- log.Debugf("There are %d connections to open", len(be.connections))
- for _, cn := range be.connections {
- // Copy in the metadata
- if cn.getState() == connectivity.Ready && cn.getConn() != nil {
- log.Debugf("Opening southbound stream for connection '%s'", cn.name)
- // Create an outgoing context that includes the incoming metadata
- // and that will cancel if the server's context is canceled
- clientCtx, clientCancel := context.WithCancel(serverStream.Context())
- clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy())
- //TODO: Same check here, only add the serial number if necessary
- clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number",
- strconv.FormatUint(f.serialNo, 10))
- // Create the client stream
- if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
- cn.getConn(), f.methodInfo.all); err != nil {
- log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
- fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
- rtrn.streams[cn.name] = nil
- } else {
- rtrn.streams[cn.name] = &stream{stream: clientStream, ctx: clientCtx,
- cancel: clientCancel, s2cReturn: nil,
- ok2Close: make(chan struct{}),
- c2sReturn: make(chan error, 1)}
- atLeastOne = true
- }
- } else if cn.getConn() == nil {
- err := errors.New(fmt.Sprintf("Connection '%s' is closed", cn.name))
- fmt.Fprint(&errStr, err.Error())
- log.Debug(err)
+ log.Debugf("There are %d/%d streams to open", len(be.openConns), len(be.connections))
+ for cn, conn := range be.openConns {
+ log.Debugf("Opening stream for connection '%s'", cn.name)
+ if stream, err := grpc.NewClientStream(r.ctx, clientStreamDescForProxying, conn, r.methodInfo.all); err != nil {
+ log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
} else {
- err := errors.New(fmt.Sprintf("Connection '%s' isn't ready", cn.name))
- fmt.Fprint(&errStr, err.Error())
- log.Debug(err)
+ r.streams[cn.name] = stream
+ go r.forwardResponseStream(cn.name, stream)
+ atLeastOne = true
}
}
if atLeastOne {
- rtrn.sortStreams()
- return rtrn, nil
+ be.activeRequests[r] = struct{}{}
+ return r, nil
}
- fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ", be.name)
+ fmt.Fprintf(&errStr, "{{No open connections for backend '%s' unable to send}} ", be.name)
log.Error(errStr.String())
return nil, errors.New(errStr.String())
}
-func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *nbFrame, sf *sbFrame) error {
-
- // Set up and launch each individual southbound stream
- var beStrms *streams
- var rtrn error = nil
- var s2cOk = false
- var c2sOk = false
-
- beStrms, err := be.openSouthboundStreams(srv, serverStream, nf)
+func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *requestFrame, sf *responseFrame) error {
+ // Set up streams for each open connection
+ request, err := be.openSouthboundStreams(srv, serverStream, nf, sf)
if err != nil {
log.Errorf("openStreams failed: %v", err)
return err
}
- // If we get here, there has to be AT LEAST ONE open stream
- // *Do not explicitly close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
- // Channels do not have to be closed, it is just a control flow mechanism, see
- // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
-
- log.Debug("Starting server to client forwarding")
- s2cErrChan := beStrms.forwardServerToClient(serverStream, nf)
-
- log.Debug("Starting client to server forwarding")
- c2sErrChan := beStrms.forwardClientToServer(serverStream, sf)
-
- // We don't know which side is going to stop sending first, so we need a select between the two.
- for i := 0; i < 2; i++ {
- select {
- case s2cErr := <-s2cErrChan:
- s2cOk = true
- log.Debug("Processing s2cErr")
- if s2cErr == io.EOF {
- log.Debug("s2cErr reporting EOF")
- // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./
- // the clientStream>serverStream may continue sending though.
- defer beStrms.closeSend()
- if c2sOk {
- return rtrn
- }
- } else {
- log.Debugf("s2cErr reporting %v", s2cErr)
- // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
- // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
- // exit with an error to the stack
- beStrms.clientCancel()
- return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
- }
- case c2sErr := <-c2sErrChan:
- c2sOk = true
- log.Debug("Processing c2sErr")
- // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
- // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
- // will be nil.
- serverStream.SetTrailer(beStrms.trailer())
- // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
- // NOTE!!! with redundant backends, it's likely that one of the backends
- // returns a response before all the data has been sent southbound and
- // the southbound streams are closed. Should this happen one of the
- // backends may not get the request.
- if c2sErr != io.EOF {
- rtrn = c2sErr
- }
- log.Debug("c2sErr reporting EOF")
- if s2cOk {
- return rtrn
- }
- }
+ log.Debug("Starting request stream forwarding")
+ if s2cErr := request.forwardRequestStream(serverStream); s2cErr != nil {
+ // exit with an error to the stack
+ return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
}
- return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
+ // wait for response stream to complete
+ return <-request.responseErrChan
}
func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
@@ -191,7 +153,12 @@
log.Debugf("Configuring the backend with %v", *conf)
// Validate the conifg and configure the backend
- be := &backend{name: conf.Name, connections: make(map[string]*connection), openConns: 0}
+ be := &backend{
+ name: conf.Name,
+ connections: make(map[string]*connection),
+ openConns: make(map[*connection]*grpc.ClientConn),
+ activeRequests: make(map[*request]struct{}),
+ }
if conf.Type == BackendUndefined {
log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
rtrn_err = true
@@ -248,25 +215,23 @@
log.Errorf("A connection must have a name for backend %s in cluster %s",
conf.Name, clusterName)
} else {
- gc := &gConnection{conn: nil, cancel: nil, state: connectivity.Idle}
- be.connections[cnConf.Name] = &connection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, backend: be, gConn: gc}
- if cnConf.Addr != "" { // This connection will be specified later.
- if _, err := url.Parse(cnConf.Addr); err != nil {
- log.Errorf("The address for connection %s in backend %s in cluster %s is invalid: %s",
- cnConf.Name, conf.Name, clusterName, err)
- rtrn_err = true
- }
- // Validate the port number. This just validtes that it's a non 0 integer
- if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ be.connections[cnConf.Name] = &connection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, backend: be, ctx: ctx, close: cancelFunc}
+ if _, err := url.Parse(cnConf.Addr); err != nil {
+ log.Errorf("The address for connection %s in backend %s in cluster %s is invalid: %s",
+ cnConf.Name, conf.Name, clusterName, err)
+ rtrn_err = true
+ }
+ // Validate the port number. This just validtes that it's a non 0 integer
+ if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
+ log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
+ cnConf.Port, cnConf.Name, conf.Name, clusterName)
+ rtrn_err = true
+ } else {
+ if n <= 0 && n > 65535 {
log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
cnConf.Port, cnConf.Name, conf.Name, clusterName)
rtrn_err = true
- } else {
- if n <= 0 && n > 65535 {
- log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
- cnConf.Port, cnConf.Name, conf.Name, clusterName)
- rtrn_err = true
- }
}
}
}
@@ -281,20 +246,26 @@
return be, nil
}
-func (be *backend) incConn() {
+func (be *backend) incConn(cn *connection, conn *grpc.ClientConn) {
be.mutex.Lock()
defer be.mutex.Unlock()
- be.openConns++
+
+ be.openConns[cn] = conn
+ be.splitActiveStreamsUnsafe(cn, conn)
}
-func (be *backend) decConn() {
+func (be *backend) decConn(cn *connection) {
be.mutex.Lock()
defer be.mutex.Unlock()
- be.openConns--
- if be.openConns < 0 {
- log.Error("Internal error, number of open connections less than 0")
- be.openConns = 0
- }
+
+ delete(be.openConns, cn)
+}
+
+func (be *backend) NumOpenConnections() int {
+ be.mutex.Lock()
+ defer be.mutex.Unlock()
+
+ return len(be.openConns)
}
// Attempts to establish all the connections for a backend
@@ -303,7 +274,7 @@
// handled after that.
func (be *backend) connectAll() {
for _, cn := range be.connections {
- cn.connect()
+ go cn.connect()
}
}
diff --git a/afrouter/afrouter/binding-router.go b/afrouter/afrouter/binding-router.go
index 67a7a7e..dd73756 100644
--- a/afrouter/afrouter/binding-router.go
+++ b/afrouter/afrouter/binding-router.go
@@ -39,6 +39,10 @@
currentBackend **backend
}
+func (br BindingRouter) IsStreaming(_ string) (bool, bool) {
+ panic("not implemented")
+}
+
func (br BindingRouter) BackendCluster(s string, metaKey string) (*cluster, error) {
return br.beCluster, nil
//return nil,errors.New("Not implemented yet")
@@ -79,7 +83,7 @@
func (br BindingRouter) Route(sel interface{}) *backend {
var err error
switch sl := sel.(type) {
- case *nbFrame:
+ case *requestFrame:
if b, ok := br.bindings[sl.metaVal]; ok == true { // binding exists, just return it
return b
} else { // establish a new binding or error.
diff --git a/afrouter/afrouter/cluster.go b/afrouter/afrouter/cluster.go
index 879246b..e33db67 100644
--- a/afrouter/afrouter/cluster.go
+++ b/afrouter/afrouter/cluster.go
@@ -94,7 +94,7 @@
log.Debug("Previous backend is nil")
be = c.backends[0]
in = be
- if be.openConns != 0 {
+ if be.NumOpenConnections() != 0 {
return be, nil
}
}
@@ -106,7 +106,7 @@
cur = 0
}
log.Debugf("Next backend is %d:%s", cur, c.backends[cur].name)
- if c.backends[cur].openConns > 0 {
+ if c.backends[cur].NumOpenConnections() > 0 {
return c.backends[cur], nil
}
if c.backends[cur] == in {
@@ -133,8 +133,8 @@
// now.
// Get the backend to use.
- // Allocate the nbFrame here since it holds the "context" of this communication
- nf := &nbFrame{router: r, methodInfo: methodInfo, serialNo: c.allocateSerialNumber(), metaKey: mk, metaVal: mv}
+ // Allocate the requestFrame here since it holds the "context" of this communication
+ nf := &requestFrame{router: r, methodInfo: methodInfo, serialNo: c.allocateSerialNumber(), metaKey: mk, metaVal: mv}
log.Debugf("Nb frame allocate with method %s", nf.methodInfo.method)
if be, err := c.assignBackend(serverStream, nf); err != nil {
@@ -143,14 +143,14 @@
return err
} else {
log.Debugf("Backend '%s' selected", be.name)
- // Allocate a sbFrame here because it might be needed for return value intercept
- sf := &sbFrame{router: r, backend: be, method: nf.methodInfo.method, metaKey: mk, metaVal: mv}
+ // Allocate a responseFrame here because it might be needed for return value intercept
+ sf := &responseFrame{router: r, backend: be, method: nf.methodInfo.method, metaKey: mk, metaVal: mv}
log.Debugf("Sb frame allocated with router %s", r.Name())
return be.handler(srv, serverStream, nf, sf)
}
}
-func (c *cluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) {
+func (c *cluster) assignBackend(src grpc.ServerStream, f *requestFrame) (*backend, error) {
// Receive the first message from the server. This calls the assigned codec in which
// Unmarshal gets executed. That will use the assigned router to select a backend
// and add it to the frame
@@ -163,7 +163,7 @@
err := fmt.Errorf("Unable to route method '%s'", f.methodInfo.method)
log.Error(err)
return nil, err
- } else if f.backend.openConns == 0 {
+ } else if len(f.backend.openConns) == 0 {
err := fmt.Errorf("No open connections on backend '%s'", f.backend.name)
log.Error(err)
return f.backend, err
diff --git a/afrouter/afrouter/codec.go b/afrouter/afrouter/codec.go
index 675320a..7ea6ef2 100644
--- a/afrouter/afrouter/codec.go
+++ b/afrouter/afrouter/codec.go
@@ -21,7 +21,6 @@
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"google.golang.org/grpc"
- "sync"
)
func Codec() grpc.Codec {
@@ -36,19 +35,18 @@
parentCodec grpc.Codec
}
-// sbFrame is a frame being "returned" to whomever established the connection
-type sbFrame struct {
+// responseFrame is a frame being "returned" to whomever established the connection
+type responseFrame struct {
payload []byte
router Router
method string
backend *backend
- mutex sync.Mutex
metaKey string
metaVal string
}
-// nbFrame is a frame coming in from whomever established the connection
-type nbFrame struct {
+// requestFrame is a frame coming in from whomever established the connection
+type requestFrame struct {
payload []byte
router Router
backend *backend
@@ -61,9 +59,9 @@
func (cdc *transparentRoutingCodec) Marshal(v interface{}) ([]byte, error) {
switch t := v.(type) {
- case *sbFrame:
+ case *responseFrame:
return t.payload, nil
- case *nbFrame:
+ case *requestFrame:
return t.payload, nil
default:
return cdc.parentCodec.Marshal(v)
@@ -72,17 +70,21 @@
func (cdc *transparentRoutingCodec) Unmarshal(data []byte, v interface{}) error {
switch t := v.(type) {
- case *sbFrame:
+ case *responseFrame:
t.payload = data
// This is where the affinity is established on a northbound response
t.router.ReplyHandler(v)
return nil
- case *nbFrame:
+ case *requestFrame:
t.payload = data
// This is were the afinity value is pulled from the payload
// and the backend selected.
t.backend = t.router.Route(v)
- log.Debugf("Routing returned %v for method %s", t.backend, t.methodInfo.method)
+ name := "<nil>"
+ if t.backend != nil {
+ name = t.backend.name
+ }
+ log.Debugf("Routing returned %s for method %s", name, t.methodInfo.method)
return nil
default:
return cdc.parentCodec.Unmarshal(data, v)
diff --git a/afrouter/afrouter/config.go b/afrouter/afrouter/config.go
index 1f0be7b..6008199 100644
--- a/afrouter/afrouter/config.go
+++ b/afrouter/afrouter/config.go
@@ -22,6 +22,7 @@
"errors"
"flag"
"fmt"
+ "github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/opencord/voltha-go/common/log"
"io/ioutil"
"os"
@@ -62,16 +63,17 @@
}
type RouterConfig struct {
- Name string `json:"name"`
- ProtoService string `json:"service"`
- ProtoPackage string `json:"package"`
- Routes []RouteConfig `json:"routes"`
+ Name string `json:"name"`
+ ProtoService string `json:"service"`
+ ProtoPackage string `json:"package"`
+ ProtoFile string `json:"proto_descriptor"`
+ Routes []RouteConfig `json:"routes"`
+ protoDescriptor descriptor.FileDescriptorSet
}
type RouteConfig struct {
Name string `json:"name"`
Type routeType `json:"type"`
- ProtoFile string `json:"proto_descriptor"`
Association associationType `json:"association"`
RouteField string `json:"routing_field"`
Methods []string `json:"methods"` // The GRPC methods to route using the route field
diff --git a/afrouter/afrouter/connection.go b/afrouter/afrouter/connection.go
index fab1052..dcdb8d6 100644
--- a/afrouter/afrouter/connection.go
+++ b/afrouter/afrouter/connection.go
@@ -21,226 +21,82 @@
"github.com/opencord/voltha-go/common/log"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
- "sync"
"time"
)
// connection represents a connection to a single backend
type connection struct {
- mutex sync.Mutex
+ backend *backend
name string
addr string
port string
- gConn *gConnection
- backend *backend
-}
-
-// This structure should never be referred to
-// by any routine outside of *connection
-// routines.
-type gConnection struct {
- mutex sync.Mutex
- state connectivity.State
- conn *grpc.ClientConn
- cancel context.CancelFunc
+ ctx context.Context
+ close context.CancelFunc
}
func (cn *connection) connect() {
- if cn.addr != "" && cn.getConn() == nil {
- log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name, cn.addr, cn.port)
+ for {
+ log.Infof("Connecting to %s with addr: %s and port %s", cn.name, cn.addr, cn.port)
// Dial doesn't block, it just returns and continues connecting in the background.
// Check back later to confirm and increase the connection count.
- ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection
- cn.setCancel(cnclFnc)
- if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil {
- log.Errorf("Dialng connection %v:%v", cn, err)
- cn.waitAndTryAgain(ctx)
- } else {
- cn.setConn(conn)
- log.Debugf("Starting the connection monitor for '%s'", cn.name)
- cn.monitor(ctx)
- }
- } else if cn.addr == "" {
- log.Infof("No address supplied for connection '%s', not connecting for now", cn.name)
- } else {
- log.Debugf("Connection '%s' is already connected, ignoring", cn.name)
- }
-}
-func (cn *connection) waitAndTryAgain(ctx context.Context) {
- go func(ctx context.Context) {
- ctxTm, cnclTm := context.WithTimeout(context.Background(), 10*time.Second)
+ var err error
+ conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure(), grpc.WithBackoffMaxDelay(time.Second*15))
+ if err != nil {
+ log.Fatalf("Dialing connection %v:%v", cn, err)
+ }
+
+ log.Debugf("Starting the connection monitor for '%s'", cn.name)
+ cn.monitor(conn)
+ conn.Close()
+
select {
- case <-ctxTm.Done():
- cnclTm()
- log.Debugf("Trying to connect '%s'", cn.name)
- // Connect creates a new context so cancel this one.
- cn.cancel()
- cn.connect()
+ case <-cn.ctx.Done():
return
- case <-ctx.Done():
- cnclTm()
- return
+ default:
}
- }(ctx)
-}
-
-func (cn *connection) cancel() {
- cn.mutex.Lock()
- defer cn.mutex.Unlock()
- log.Debugf("Canceling connection %s", cn.name)
- if cn.gConn != nil {
- if cn.gConn.cancel != nil {
- cn.gConn.cancel()
- } else {
- log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
- }
- } else {
- log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name)
}
}
-func (cn *connection) setCancel(cancel context.CancelFunc) {
- cn.mutex.Lock()
- defer cn.mutex.Unlock()
- if cn.gConn != nil {
- cn.gConn.cancel = cancel
- } else {
- log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name)
- }
-}
-
-func (cn *connection) setConn(conn *grpc.ClientConn) {
- cn.mutex.Lock()
- defer cn.mutex.Unlock()
- if cn.gConn != nil {
- cn.gConn.conn = conn
- } else {
- log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name)
- }
-}
-
-func (cn *connection) getConn() *grpc.ClientConn {
- cn.mutex.Lock()
- defer cn.mutex.Unlock()
- if cn.gConn != nil {
- return cn.gConn.conn
- }
- return nil
-}
-
-func (cn *connection) close() {
- cn.mutex.Lock()
- defer cn.mutex.Unlock()
- log.Debugf("Closing connection %s", cn.name)
- if cn.gConn != nil && cn.gConn.conn != nil {
- if cn.gConn.conn.GetState() == connectivity.Ready {
- cn.backend.decConn() // Decrease the connection reference
- }
- if cn.gConn.cancel != nil {
- cn.gConn.cancel() // Cancel the context first to force monitor functions to exit
- } else {
- log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
- }
- cn.gConn.conn.Close() // Close the connection
- // Now replace the gConn object with a new one as this one just
- // fades away as references to it are released after the close
- // finishes in the background.
- cn.gConn = &gConnection{conn: nil, cancel: nil, state: connectivity.TransientFailure}
- } else {
- log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name)
- }
-
-}
-
-func (cn *connection) setState(st connectivity.State) {
- cn.mutex.Lock()
- defer cn.mutex.Unlock()
- if cn.gConn != nil {
- cn.gConn.state = st
- } else {
- log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name)
- }
-}
-
-func (cn *connection) getState() connectivity.State {
- cn.mutex.Lock()
- defer cn.mutex.Unlock()
- if cn.gConn != nil {
- if cn.gConn.conn != nil {
- return cn.gConn.conn.GetState()
- } else {
- log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name)
- }
- } else {
- log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name)
- }
- // For lack of a better state to use. The logs will help determine what happened here.
- return connectivity.TransientFailure
-}
-
-func (cn *connection) monitor(ctx context.Context) {
+func (cn *connection) monitor(conn *grpc.ClientConn) {
be := cn.backend
log.Debugf("Setting up monitoring for backend %s", be.name)
- go func(ctx context.Context) {
- var delay time.Duration = 100 //ms
- for {
- //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, be.name, cn.conn)
- if cn.getState() == connectivity.Ready {
- log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, be.name)
- cn.setState(connectivity.Ready)
- be.incConn()
- if cn.getConn() != nil && !cn.getConn().WaitForStateChange(ctx, connectivity.Ready) {
- // The context was canceled. This is done by the close function
- // so just exit the routine
- log.Debugf("Contxt canceled for connection '%s' on backend '%s'", cn.name, be.name)
- return
- }
- if cs := cn.getConn(); cs != nil {
- switch cs := cn.getState(); cs {
- case connectivity.TransientFailure:
- cn.setState(cs)
- be.decConn()
- log.Infof("Transient failure for connection '%s' on backend '%s'", cn.name, be.name)
- delay = 100
- case connectivity.Shutdown:
- //The connection was closed. The assumption here is that the closer
- // will manage the connection count and setting the conn to nil.
- // Exit the routine
- log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, be.name)
- return
- case connectivity.Idle:
- // This can only happen if the server sends a GoAway. This can
- // only happen if the server has modified MaxConnectionIdle from
- // its default of infinity. The only solution here is to close the
- // connection and keepTrying()?
- //TODO: Read the grpc source code to see if there's a different approach
- log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, be.name)
- cn.close()
- cn.connect()
- return
- }
- } else { // A nil means something went horribly wrong, error and exit.
- log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s", cn.name)
- return
- }
- } else {
- log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, be.name)
- ctxTm, cnclTm := context.WithTimeout(context.Background(), delay*time.Millisecond)
- if delay < 30000 {
- delay += delay
- }
- select {
- case <-ctxTm.Done():
- cnclTm() // Doubt this is required but it's harmless.
- // Do nothing but let the loop continue
- case <-ctx.Done():
- cnclTm()
- // Context was closed, close and exit routine
- //cn.close() NO! let the close be managed externally!
- return
- }
+ state := connectivity.Idle
+monitorLoop:
+ for {
+ if !conn.WaitForStateChange(cn.ctx, state) {
+ log.Debugf("Context canceled for connection '%s' on backend '%s'", cn.name, be.name)
+ break monitorLoop // connection closed
+ }
+
+ if newState := conn.GetState(); newState != state {
+ previousState := state
+ state = newState
+
+ if previousState == connectivity.Ready {
+ be.decConn(cn)
+ log.Infof("Lost connection '%s' on backend '%s'", cn.name, be.name)
+ }
+
+ switch state {
+ case connectivity.Ready:
+ log.Infof("Connection '%s' on backend '%s' becomes ready", cn.name, be.name)
+ be.incConn(cn, conn)
+ case connectivity.TransientFailure, connectivity.Connecting:
+ // we don't log these, to avoid spam
+ case connectivity.Shutdown:
+ // the connection was closed
+ log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, be.name)
+ break monitorLoop
+ case connectivity.Idle:
+ // This can only happen if the server sends a GoAway. This can
+ // only happen if the server has modified MaxConnectionIdle from
+ // its default of infinity. The only solution here is to close the
+ // connection and keepTrying()?
+ //TODO: Read the grpc source code to see if there's a different approach
+ log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, be.name)
+ break monitorLoop
}
}
- }(ctx)
+ }
}
diff --git a/afrouter/afrouter/method-router.go b/afrouter/afrouter/method-router.go
index 2c1ca4f..fd3b974 100644
--- a/afrouter/afrouter/method-router.go
+++ b/afrouter/afrouter/method-router.go
@@ -19,23 +19,66 @@
import (
"errors"
"fmt"
+ "github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
+ "io/ioutil"
)
const NoMeta = "nometa"
type MethodRouter struct {
- name string
- service string
- methodRouter map[string]map[string]Router // map of [metadata][method]
+ name string
+ service string
+ methodRouter map[string]map[string]Router // map of [metadata][method]
+ methodStreaming map[string]streamingDirections
+}
+
+type streamingDirections struct {
+ request bool
+ response bool
}
func newMethodRouter(config *RouterConfig) (Router, error) {
- mr := MethodRouter{name: config.Name, service: config.ProtoService, methodRouter: make(map[string]map[string]Router)}
- mr.methodRouter[NoMeta] = make(map[string]Router) // For routes not needing metadata (all expcept binding at this time)
+ // Load the protobuf descriptor file
+ fb, err := ioutil.ReadFile(config.ProtoFile)
+ if err != nil {
+ log.Errorf("Could not open proto file '%s'", config.ProtoFile)
+ return nil, err
+ }
+ if err := proto.Unmarshal(fb, &config.protoDescriptor); err != nil {
+ log.Errorf("Could not unmarshal %s, %v", "proto.pb", err)
+ return nil, err
+ }
+
+ mr := MethodRouter{
+ name: config.Name,
+ service: config.ProtoService,
+ methodRouter: map[string]map[string]Router{
+ NoMeta: make(map[string]Router), // For routes not needing metadata (all except binding at this time)
+ },
+ methodStreaming: make(map[string]streamingDirections),
+ }
log.Debugf("Processing MethodRouter config %v", *config)
+
+ for _, file := range config.protoDescriptor.File {
+ if *file.Package == config.ProtoPackage {
+ for _, service := range file.Service {
+ if *service.Name == config.ProtoService {
+ for _, method := range service.Method {
+ if clientStreaming, serverStreaming := method.ClientStreaming != nil && *method.ClientStreaming, method.ServerStreaming != nil && *method.ServerStreaming; clientStreaming || serverStreaming {
+ mr.methodStreaming[*method.Name] = streamingDirections{
+ request: clientStreaming,
+ response: serverStreaming,
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
if len(config.Routes) == 0 {
return nil, errors.New(fmt.Sprintf("Router %s must have at least one route", config.Name))
}
@@ -121,13 +164,13 @@
func (mr MethodRouter) ReplyHandler(sel interface{}) error {
switch sl := sel.(type) {
- case *sbFrame:
+ case *responseFrame:
if r, ok := mr.methodRouter[NoMeta][sl.method]; ok {
return r.ReplyHandler(sel)
}
// TODO: this case should also be an error
default: //TODO: This should really be a big error
- // A reply handler should only be called on the sbFrame
+ // A reply handler should only be called on the responseFrame
return nil
}
return nil
@@ -135,7 +178,7 @@
func (mr MethodRouter) Route(sel interface{}) *backend {
switch sl := sel.(type) {
- case *nbFrame:
+ case *requestFrame:
if r, ok := mr.methodRouter[sl.metaKey][sl.methodInfo.method]; ok {
return r.Route(sel)
}
@@ -146,6 +189,11 @@
}
}
+func (mr MethodRouter) IsStreaming(method string) (bool, bool) {
+ streamingDirections := mr.methodStreaming[method]
+ return streamingDirections.request, streamingDirections.response
+}
+
func (mr MethodRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
if r, ok := mr.methodRouter[metaKey][mthd]; ok {
return r.BackendCluster(mthd, metaKey)
diff --git a/afrouter/afrouter/request.go b/afrouter/afrouter/request.go
new file mode 100644
index 0000000..0af20e3
--- /dev/null
+++ b/afrouter/afrouter/request.go
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "context"
+ "encoding/hex"
+ "errors"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "io"
+ "sync"
+)
+
+type request struct {
+ mutex sync.Mutex
+ activeResponseStreamOnce sync.Once
+ setResponseHeaderOnce sync.Once
+ responseStreamMutex sync.Mutex
+
+ streams map[string]grpc.ClientStream
+
+ requestFrameBacklog [][]byte
+ responseErrChan chan error
+ sendClosed bool
+
+ backend *backend
+ ctx context.Context
+ serverStream grpc.ServerStream
+ methodInfo methodDetails
+ requestFrame *requestFrame
+ responseFrame *responseFrame
+ isStreamingRequest bool
+ isStreamingResponse bool
+}
+
+// catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked
+func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) {
+ r.streams[connName] = stream
+
+ // prime new streams with any traffic they might have missed (non-streaming requests only)
+ frame := *r.requestFrame // local copy of frame
+ for _, payload := range r.requestFrameBacklog {
+ frame.payload = payload
+ if err := stream.SendMsg(&frame); err != nil {
+ log.Debugf("Error on SendMsg: %s", err.Error())
+ break
+ }
+ }
+ if r.sendClosed {
+ stream.CloseSend()
+ }
+
+ r.mutex.Unlock()
+
+ r.forwardResponseStream(connName, stream)
+}
+
+// forwardResponseStream forwards the response stream
+func (r *request) forwardResponseStream(connName string, stream grpc.ClientStream) {
+ var queuedFrames [][]byte
+ frame := *r.responseFrame
+ var err error
+ activeStream := false
+ for {
+ err = stream.RecvMsg(&frame)
+ // the first thread to reach this point (first to receive a response frame) will become the active stream
+ r.activeResponseStreamOnce.Do(func() { activeStream = true })
+ if err != nil {
+ // this can be io.EOF which is the success case
+ break
+ }
+
+ if r.isStreamingResponse {
+ // streaming response - send immediately
+ if err = r.sendResponseFrame(stream, frame); err != nil {
+ break
+ }
+ } else { // !r.isStreamingResponse
+
+ if r.isStreamingRequest { // && !r.isStreamingResponse
+ // queue the frame (only send response when the last stream closes)
+ queuedFrames = append(queuedFrames, frame.payload)
+ } else { // !r.isStreamingRequest && !r.isStreamingResponse
+
+ // only the active stream will respond
+ if activeStream { // && !r.isStreamingRequest && !r.isStreamingResponse
+ // send the response immediately
+ if err = r.sendResponseFrame(stream, frame); err != nil {
+ break
+ }
+ } else { // !activeStream && !r.isStreamingRequest && !r.isStreamingResponse
+ // just read & discard until the stream dies
+ }
+ }
+ }
+ }
+
+ log.Debugf("Closing stream to %s", connName)
+
+ // io.EOF is the success case
+ if err == io.EOF {
+ err = nil
+ }
+
+ // this double-lock sets off alarm bells in my head
+ r.backend.mutex.Lock()
+ r.mutex.Lock()
+ delete(r.streams, connName)
+ streamsLeft := len(r.streams)
+
+ // if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests)
+ if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) {
+ // request is complete, cleanup
+ delete(r.backend.activeRequests, r)
+ r.mutex.Unlock()
+ r.backend.mutex.Unlock()
+
+ // send any queued frames we have (streaming request & !streaming response only, but no harm trying in other cases)
+ for _, payload := range queuedFrames {
+ if err != nil {
+ // if there's been an error, don't try to send anymore
+ break
+ }
+ frame.payload = payload
+ err = r.sendResponseFrame(stream, frame)
+ }
+
+ // We may have received Trailers as part of the call.
+ r.serverStream.SetTrailer(stream.Trailer())
+
+ // response stream complete
+ r.responseErrChan <- err
+ } else {
+ r.mutex.Unlock()
+ r.backend.mutex.Unlock()
+ }
+}
+
+func (r *request) sendResponseFrame(stream grpc.ClientStream, f responseFrame) error {
+ r.responseStreamMutex.Lock()
+ defer r.responseStreamMutex.Unlock()
+
+ // the header should only be set once, even if multiple streams can respond.
+ setHeader := false
+ r.setResponseHeaderOnce.Do(func() { setHeader = true })
+ if setHeader {
+ // This is a bit of a hack, but client to server headers are only readable after first client msg is
+ // received but must be written to server stream before the first msg is flushed.
+ // This is the only place to do it nicely.
+ md, err := stream.Header()
+ if err != nil {
+ return err
+ }
+ // Update the metadata for the response.
+ if f.metaKey != NoMeta {
+ if f.metaVal == "" {
+ // We could also always just do this
+ md.Set(f.metaKey, f.backend.name)
+ } else {
+ md.Set(f.metaKey, f.metaVal)
+ }
+ }
+ if err := r.serverStream.SendHeader(md); err != nil {
+ return err
+ }
+ }
+
+ log.Debugf("Response frame %s", hex.EncodeToString(f.payload))
+
+ return r.serverStream.SendMsg(&f)
+}
+
+func (r *request) sendAll(frame *requestFrame) error {
+ r.mutex.Lock()
+ if !r.isStreamingRequest {
+ // save frames of non-streaming requests, so we can catchup new streams
+ r.requestFrameBacklog = append(r.requestFrameBacklog, frame.payload)
+ }
+
+ // send to all existing streams
+ streams := make(map[string]grpc.ClientStream, len(r.streams))
+ for n, s := range r.streams {
+ streams[n] = s
+ }
+ r.mutex.Unlock()
+
+ var rtrn error
+ atLeastOne := false
+ atLeastOneSuccess := false
+ for _, stream := range streams {
+ if err := stream.SendMsg(frame); err != nil {
+ log.Debugf("Error on SendMsg: %s", err.Error())
+ rtrn = err
+ } else {
+ atLeastOneSuccess = true
+ }
+ atLeastOne = true
+ }
+ // If one of the streams succeeded, declare success
+ // if none did pick an error and return it.
+ if atLeastOne {
+ if atLeastOneSuccess {
+ return nil
+ } else {
+ return rtrn
+ }
+ } else {
+ err := errors.New("unable to send, all streams have closed")
+ log.Error(err)
+ return err
+ }
+}
+
+func (r *request) forwardRequestStream(src grpc.ServerStream) error {
+ // The frame buffer already has the results of a first
+ // RecvMsg in it so the first thing to do is to
+ // send it to the list of client streams and only
+ // then read some more.
+ frame := *r.requestFrame // local copy of frame
+ var rtrn error
+ for {
+ // Send the message to each of the backend streams
+ if err := r.sendAll(&frame); err != nil {
+ log.Debugf("SendAll failed %s", err.Error())
+ rtrn = err
+ break
+ }
+ log.Debugf("Request frame %s", hex.EncodeToString(frame.payload))
+ if err := src.RecvMsg(&frame); err != nil {
+ rtrn = err // this can be io.EOF which is happy case
+ break
+ }
+ }
+
+ r.mutex.Lock()
+ log.Debug("Closing southbound streams")
+ r.sendClosed = true
+ for _, stream := range r.streams {
+ stream.CloseSend()
+ }
+ r.mutex.Unlock()
+
+ if rtrn != io.EOF {
+ log.Debugf("s2cErr reporting %v", rtrn)
+ return rtrn
+ }
+ log.Debug("s2cErr reporting EOF")
+ // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore.
+ // the clientStream>serverStream may continue sending though.
+ return nil
+}
diff --git a/afrouter/afrouter/round-robin-router.go b/afrouter/afrouter/round-robin-router.go
index 70f164a..60ff457 100644
--- a/afrouter/afrouter/round-robin-router.go
+++ b/afrouter/afrouter/round-robin-router.go
@@ -80,6 +80,10 @@
return "", "", nil
}
+func (rr RoundRobinRouter) IsStreaming(_ string) (bool, bool) {
+ panic("not implemented")
+}
+
func (rr RoundRobinRouter) BackendCluster(s string, mk string) (*cluster, error) {
return rr.cluster, nil
}
@@ -91,7 +95,7 @@
func (rr RoundRobinRouter) Route(sel interface{}) *backend {
var err error
switch sl := sel.(type) {
- case *nbFrame:
+ case *requestFrame:
// Since this is a round robin router just get the next backend
if *rr.currentBackend, err = rr.cluster.nextBackend(*rr.currentBackend, BackendSequenceRoundRobin); err == nil {
return *rr.currentBackend
diff --git a/afrouter/afrouter/router.go b/afrouter/afrouter/router.go
index 323ffb2..1bd2d6e 100644
--- a/afrouter/afrouter/router.go
+++ b/afrouter/afrouter/router.go
@@ -29,6 +29,7 @@
Name() string
Route(interface{}) *backend
Service() string
+ IsStreaming(string) (bool, bool)
BackendCluster(string, string) (*cluster, error)
FindBackendCluster(string) *cluster
ReplyHandler(interface{}) error
diff --git a/afrouter/afrouter/server.go b/afrouter/afrouter/server.go
index dff1519..82269d2 100644
--- a/afrouter/afrouter/server.go
+++ b/afrouter/afrouter/server.go
@@ -23,6 +23,7 @@
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"net"
+ "net/url"
"strconv"
)
@@ -58,7 +59,7 @@
rtrn_err = true
}
// Validate the ip address if one is provided
- if ip := net.ParseIP(config.Addr); config.Addr != "" && ip == nil {
+ if _, err := url.Parse(config.Addr); err != nil {
log.Errorf("Invalid address '%s' provided for server '%s'", config.Addr, config.Name)
rtrn_err = true
}
@@ -144,7 +145,7 @@
if !ok {
return grpc.Errorf(codes.Internal, "lowLevelServerStream doesn't exist in context")
}
- log.Debugf("Processing grpc request %s on server %s", fullMethodName, s.name)
+ log.Debugf("\n\nProcessing grpc request %s on server %s", fullMethodName, s.name)
methodInfo := newMethodDetails(fullMethodName)
r, ok := s.getRouter(methodInfo.pkg, methodInfo.service)
//fn, ok := s.routers[methodInfo.pkg][methodInfo.service]
@@ -156,7 +157,7 @@
log.Error(err)
return err
}
- log.Debugf("Selected router %s\n", r.Name())
+ log.Debugf("Selected router %s", r.Name())
mk, mv, err := r.GetMetaKeyVal(serverStream)
if err != nil {
diff --git a/afrouter/afrouter/streams.go b/afrouter/afrouter/streams.go
deleted file mode 100644
index bb7faea..0000000
--- a/afrouter/afrouter/streams.go
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package afrouter
-
-import (
- "context"
- "encoding/hex"
- "errors"
- "github.com/opencord/voltha-go/common/log"
- "google.golang.org/grpc"
- "google.golang.org/grpc/metadata"
- "sort"
- "sync"
-)
-
-type streams struct {
- mutex sync.Mutex
- activeStream *stream
- streams map[string]*stream
- sortedStreams []*stream
-}
-
-type stream struct {
- stream grpc.ClientStream
- ctx context.Context
- cancel context.CancelFunc
- ok2Close chan struct{}
- c2sReturn chan error
- s2cReturn error
-}
-
-func (s *streams) clientCancel() {
- for _, strm := range s.streams {
- if strm != nil {
- strm.cancel()
- }
- }
-}
-
-func (s *streams) closeSend() {
- for _, strm := range s.streams {
- if strm != nil {
- <-strm.ok2Close
- log.Debug("Closing southbound stream")
- strm.stream.CloseSend()
- }
- }
-}
-
-func (s *streams) trailer() metadata.MD {
- return s.activeStream.stream.Trailer()
-}
-
-func (s *streams) getActive() *stream {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- return s.activeStream
-}
-
-func (s *streams) setThenGetActive(strm *stream) *stream {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- if s.activeStream == nil {
- s.activeStream = strm
- }
- return s.activeStream
-}
-
-func (s *streams) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error {
- fc2s := func(srcS *stream) {
- for i := 0; ; i++ {
- if err := srcS.stream.RecvMsg(f); err != nil {
- if s.setThenGetActive(srcS) == srcS {
- srcS.c2sReturn <- err // this can be io.EOF which is the success case
- } else {
- srcS.c2sReturn <- nil // Inactive responder
- }
- close(srcS.ok2Close)
- break
- }
- if s.setThenGetActive(srcS) != srcS {
- srcS.c2sReturn <- nil
- continue
- }
- if i == 0 {
- // This is a bit of a hack, but client to server headers are only readable after first client msg is
- // received but must be written to server stream before the first msg is flushed.
- // This is the only place to do it nicely.
- md, err := srcS.stream.Header()
- if err != nil {
- srcS.c2sReturn <- err
- break
- }
- // Update the metadata for the response.
- if f.metaKey != NoMeta {
- if f.metaVal == "" {
- // We could also alsways just do this
- md.Set(f.metaKey, f.backend.name)
- } else {
- md.Set(f.metaKey, f.metaVal)
- }
- }
- if err := dst.SendHeader(md); err != nil {
- srcS.c2sReturn <- err
- break
- }
- }
- log.Debugf("Northbound frame %s", hex.EncodeToString(f.payload))
- if err := dst.SendMsg(f); err != nil {
- srcS.c2sReturn <- err
- break
- }
- }
- }
-
- // There should be AT LEAST one open stream at this point
- // if there isn't its a grave error in the code and it will
- // cause this thread to block here so check for it and
- // don't let the lock up happen but report the error
- ret := make(chan error, 1)
- agg := make(chan *stream)
- atLeastOne := false
- for _, strm := range s.streams {
- if strm != nil {
- go fc2s(strm)
- go func(s *stream) { // Wait on result and aggregate
- r := <-s.c2sReturn // got the return code
- if r == nil {
- return // We're the redundant stream, just die
- }
- s.c2sReturn <- r // put it back to pass it along
- agg <- s // send the stream to the aggregator
- }(strm)
- atLeastOne = true
- }
- }
- if atLeastOne == true {
- go func() { // Wait on aggregated result
- s := <-agg
- ret <- <-s.c2sReturn
- }()
- } else {
- err := errors.New("There are no open streams. Unable to forward message.")
- log.Error(err)
- ret <- err
- }
- return ret
-}
-
-func (s *streams) sendAll(f *nbFrame) error {
- var rtrn error
-
- atLeastOne := false
- for _, strm := range s.sortedStreams {
- if strm != nil {
- if err := strm.stream.SendMsg(f); err != nil {
- log.Debugf("Error on SendMsg: %s", err.Error())
- strm.s2cReturn = err
- }
- atLeastOne = true
- } else {
- log.Debugf("Nil stream")
- }
- }
- // If one of the streams succeeded, declare success
- // if none did pick an error and return it.
- if atLeastOne == true {
- for _, strm := range s.sortedStreams {
- if strm != nil {
- rtrn = strm.s2cReturn
- if rtrn == nil {
- return rtrn
- }
- }
- }
- return rtrn
- } else {
- rtrn = errors.New("There are no open streams, this should never happen")
- log.Error(rtrn)
- }
- return rtrn
-}
-
-func (s *streams) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
- ret := make(chan error, 1)
- go func() {
- // The frame buffer already has the results of a first
- // RecvMsg in it so the first thing to do is to
- // send it to the list of client streams and only
- // then read some more.
- for i := 0; ; i++ {
- // Send the message to each of the backend streams
- if err := s.sendAll(f); err != nil {
- ret <- err
- log.Debugf("SendAll failed %s", err.Error())
- break
- }
- log.Debugf("Southbound frame %s", hex.EncodeToString(f.payload))
- if err := src.RecvMsg(f); err != nil {
- ret <- err // this can be io.EOF which is happy case
- break
- }
- }
- }()
- return ret
-}
-
-func (s *streams) sortStreams() {
- var tmpKeys []string
- for k := range s.streams {
- tmpKeys = append(tmpKeys, k)
- }
- sort.Strings(tmpKeys)
- for _, v := range tmpKeys {
- s.sortedStreams = append(s.sortedStreams, s.streams[v])
- }
-}
diff --git a/afrouter/arouter.json b/afrouter/arouter.json
index 7739d65..5036402 100644
--- a/afrouter/arouter.json
+++ b/afrouter/arouter.json
@@ -3,7 +3,7 @@
{
"name": "grpc_command",
"port": 55555,
- "address": "",
+ "address": "localhost",
"type": "grpc",
"routers": [
{
@@ -20,10 +20,10 @@
"name": "vcore",
"package": "voltha",
"service": "VolthaService",
+ "proto_descriptor": "vendor/github.com/opencord/voltha-protos/go/voltha.pb",
"routes": [
{
"name": "dev_manager",
- "proto_descriptor": "voltha.pb",
"type": "rpc_affinity_message",
"association": "round_robin",
"routing_field": "id",
@@ -273,7 +273,7 @@
],
"api": {
"_comment": "If this isn't defined then no api is available for dynamic configuration and queries",
- "address": "",
+ "address": "localhost",
"port": 55554
}
}