General afrouter cleanup.
- Separated backend.go into multiple files.
- Replaced array indexing hack with enum pattern.
- Various renaming for better consistency.
- Removed a few unused structs.
- Replaced a thread with an atomic operation.
Change-Id: I2239692cac21ddb7f513b6d8c247ffa8789714ac
diff --git a/afrouter/afrouter/streams.go b/afrouter/afrouter/streams.go
new file mode 100644
index 0000000..458ed06
--- /dev/null
+++ b/afrouter/afrouter/streams.go
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "context"
+ "errors"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+ "sort"
+ "sync"
+)
+
+type streams struct {
+ mutex sync.Mutex
+ activeStream *stream
+ streams map[string]*stream
+ sortedStreams []*stream
+}
+
+type stream struct {
+ stream grpc.ClientStream
+ ctx context.Context
+ cancel context.CancelFunc
+ ok2Close chan struct{}
+ c2sReturn chan error
+ s2cReturn error
+}
+
+func (s *streams) clientCancel() {
+ for _, strm := range s.streams {
+ if strm != nil {
+ strm.cancel()
+ }
+ }
+}
+
+func (s *streams) closeSend() {
+ for _, strm := range s.streams {
+ if strm != nil {
+ <-strm.ok2Close
+ log.Debug("Closing southbound stream")
+ strm.stream.CloseSend()
+ }
+ }
+}
+
+func (s *streams) trailer() metadata.MD {
+ return s.activeStream.stream.Trailer()
+}
+
+func (s *streams) getActive() *stream {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ return s.activeStream
+}
+
+func (s *streams) setThenGetActive(strm *stream) *stream {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ if s.activeStream == nil {
+ s.activeStream = strm
+ }
+ return s.activeStream
+}
+
+func (s *streams) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error {
+ fc2s := func(srcS *stream) {
+ for i := 0; ; i++ {
+ if err := srcS.stream.RecvMsg(f); err != nil {
+ if s.setThenGetActive(srcS) == srcS {
+ srcS.c2sReturn <- err // this can be io.EOF which is the success case
+ } else {
+ srcS.c2sReturn <- nil // Inactive responder
+ }
+ close(srcS.ok2Close)
+ break
+ }
+ if s.setThenGetActive(srcS) != srcS {
+ srcS.c2sReturn <- nil
+ continue
+ }
+ if i == 0 {
+ // This is a bit of a hack, but client to server headers are only readable after first client msg is
+ // received but must be written to server stream before the first msg is flushed.
+ // This is the only place to do it nicely.
+ md, err := srcS.stream.Header()
+ if err != nil {
+ srcS.c2sReturn <- err
+ break
+ }
+ // Update the metadata for the response.
+ if f.metaKey != NoMeta {
+ if f.metaVal == "" {
+ // We could also alsways just do this
+ md.Set(f.metaKey, f.backend.name)
+ } else {
+ md.Set(f.metaKey, f.metaVal)
+ }
+ }
+ if err := dst.SendHeader(md); err != nil {
+ srcS.c2sReturn <- err
+ break
+ }
+ }
+ log.Debugf("Northbound frame %v", f.payload)
+ if err := dst.SendMsg(f); err != nil {
+ srcS.c2sReturn <- err
+ break
+ }
+ }
+ }
+
+ // There should be AT LEAST one open stream at this point
+ // if there isn't its a grave error in the code and it will
+ // cause this thread to block here so check for it and
+ // don't let the lock up happen but report the error
+ ret := make(chan error, 1)
+ agg := make(chan *stream)
+ atLeastOne := false
+ for _, strm := range s.streams {
+ if strm != nil {
+ go fc2s(strm)
+ go func(s *stream) { // Wait on result and aggregate
+ r := <-s.c2sReturn // got the return code
+ if r == nil {
+ return // We're the redundant stream, just die
+ }
+ s.c2sReturn <- r // put it back to pass it along
+ agg <- s // send the stream to the aggregator
+ }(strm)
+ atLeastOne = true
+ }
+ }
+ if atLeastOne == true {
+ go func() { // Wait on aggregated result
+ s := <-agg
+ ret <- <-s.c2sReturn
+ }()
+ } else {
+ err := errors.New("There are no open streams. Unable to forward message.")
+ log.Error(err)
+ ret <- err
+ }
+ return ret
+}
+
+func (s *streams) sendAll(f *nbFrame) error {
+ var rtrn error
+
+ atLeastOne := false
+ for _, strm := range s.sortedStreams {
+ if strm != nil {
+ if err := strm.stream.SendMsg(f); err != nil {
+ log.Debugf("Error on SendMsg: %s", err.Error())
+ strm.s2cReturn = err
+ }
+ atLeastOne = true
+ } else {
+ log.Debugf("Nil stream")
+ }
+ }
+ // If one of the streams succeeded, declare success
+ // if none did pick an error and return it.
+ if atLeastOne == true {
+ for _, strm := range s.sortedStreams {
+ if strm != nil {
+ rtrn = strm.s2cReturn
+ if rtrn == nil {
+ return rtrn
+ }
+ }
+ }
+ return rtrn
+ } else {
+ rtrn = errors.New("There are no open streams, this should never happen")
+ log.Error(rtrn)
+ }
+ return rtrn
+}
+
+func (s *streams) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
+ ret := make(chan error, 1)
+ go func() {
+ // The frame buffer already has the results of a first
+ // RecvMsg in it so the first thing to do is to
+ // send it to the list of client streams and only
+ // then read some more.
+ for i := 0; ; i++ {
+ // Send the message to each of the backend streams
+ if err := s.sendAll(f); err != nil {
+ ret <- err
+ log.Debugf("SendAll failed %s", err.Error())
+ break
+ }
+ log.Debugf("Southbound frame %v", f.payload)
+ if err := src.RecvMsg(f); err != nil {
+ ret <- err // this can be io.EOF which is happy case
+ break
+ }
+ }
+ }()
+ return ret
+}
+
+func (s *streams) sortStreams() {
+ var tmpKeys []string
+ for k := range s.streams {
+ tmpKeys = append(tmpKeys, k)
+ }
+ sort.Strings(tmpKeys)
+ for _, v := range tmpKeys {
+ s.sortedStreams = append(s.sortedStreams, s.streams[v])
+ }
+}