VOL-1967 move api-server to separate repository

Current with voltha-go acf0adaf2d91ae72b55192cc8a939e0485918d16

Change-Id: I000ea6be0789e20c922bd671562b58a7120892ae
diff --git a/internal/pkg/afrouter/request.go b/internal/pkg/afrouter/request.go
new file mode 100644
index 0000000..8fc15aa
--- /dev/null
+++ b/internal/pkg/afrouter/request.go
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+	"context"
+	"encoding/hex"
+	"errors"
+	"github.com/opencord/voltha-go/common/log"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"io"
+	"sync"
+)
+
+type request struct {
+	mutex                    sync.Mutex
+	activeResponseStreamOnce sync.Once
+	setResponseHeaderOnce    sync.Once
+	responseStreamMutex      sync.Mutex
+
+	streams map[string]grpc.ClientStream
+
+	requestFrameBacklog [][]byte
+	responseErrChan     chan error
+	sendClosed          bool
+
+	backend             *backend
+	ctx                 context.Context
+	serverStream        grpc.ServerStream
+	methodInfo          methodDetails
+	requestFrame        *requestFrame
+	responseFrame       *responseFrame
+	isStreamingRequest  bool
+	isStreamingResponse bool
+}
+
+var transactionNotAcquiredErrorString = status.Error(codes.Canceled, "transaction-not-acquired").Error()
+
+// catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked
+func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) {
+	r.streams[connName] = stream
+
+	// prime new streams with any traffic they might have missed (non-streaming requests only)
+	frame := *r.requestFrame // local copy of frame
+	for _, payload := range r.requestFrameBacklog {
+		frame.payload = payload
+		if err := stream.SendMsg(&frame); err != nil {
+			log.Debugf("Error on SendMsg: %s", err.Error())
+			break
+		}
+	}
+	if r.sendClosed {
+		stream.CloseSend()
+	}
+
+	r.mutex.Unlock()
+
+	r.forwardResponseStream(connName, stream)
+}
+
+// forwardResponseStream forwards the response stream
+func (r *request) forwardResponseStream(connName string, stream grpc.ClientStream) {
+	var queuedFrames [][]byte
+	frame := *r.responseFrame
+	var err error
+	activeStream := false
+	for {
+		err = stream.RecvMsg(&frame)
+		// if this is an inactive responder, ignore everything it sends
+		if err != nil && err.Error() == transactionNotAcquiredErrorString {
+			break
+		}
+		// the first thread to reach this point (first to receive a response frame) will become the active stream
+		r.activeResponseStreamOnce.Do(func() { activeStream = true })
+		if err != nil {
+			// this can be io.EOF which is the success case
+			break
+		}
+
+		if r.isStreamingResponse {
+			// streaming response - send immediately
+			if err = r.sendResponseFrame(stream, frame); err != nil {
+				break
+			}
+		} else { // !r.isStreamingResponse
+
+			if r.isStreamingRequest { // && !r.isStreamingResponse
+				// queue the frame (only send response when the last stream closes)
+				queuedFrames = append(queuedFrames, frame.payload)
+			} else { // !r.isStreamingRequest && !r.isStreamingResponse
+
+				// only the active stream will respond
+				if activeStream { // && !r.isStreamingRequest && !r.isStreamingResponse
+					// send the response immediately
+					if err = r.sendResponseFrame(stream, frame); err != nil {
+						break
+					}
+				} else { // !activeStream && !r.isStreamingRequest && !r.isStreamingResponse
+					// just read & discard until the stream dies
+				}
+			}
+		}
+	}
+
+	log.Debugf("Closing stream to %s", connName)
+
+	// io.EOF is the success case
+	if err == io.EOF {
+		err = nil
+	}
+
+	// this double-lock sets off alarm bells in my head
+	r.backend.mutex.Lock()
+	r.mutex.Lock()
+	delete(r.streams, connName)
+	streamsLeft := len(r.streams)
+
+	// handle the case where no cores are the active responder.  Should never happen, but just in case...
+	if streamsLeft == 0 {
+		r.activeResponseStreamOnce.Do(func() { activeStream = true })
+	}
+
+	// if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests)
+	if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) {
+		// request is complete, cleanup
+		delete(r.backend.activeRequests, r)
+		r.mutex.Unlock()
+		r.backend.mutex.Unlock()
+
+		// send any queued frames we have (streaming request & !streaming response only, but no harm trying in other cases)
+		for _, payload := range queuedFrames {
+			if err != nil {
+				// if there's been an error, don't try to send anymore
+				break
+			}
+			frame.payload = payload
+			err = r.sendResponseFrame(stream, frame)
+		}
+
+		// We may have received Trailers as part of the call.
+		r.serverStream.SetTrailer(stream.Trailer())
+
+		// response stream complete
+		r.responseErrChan <- err
+	} else {
+		r.mutex.Unlock()
+		r.backend.mutex.Unlock()
+	}
+}
+
+func (r *request) sendResponseFrame(stream grpc.ClientStream, f responseFrame) error {
+	r.responseStreamMutex.Lock()
+	defer r.responseStreamMutex.Unlock()
+
+	// the header should only be set once, even if multiple streams can respond.
+	setHeader := false
+	r.setResponseHeaderOnce.Do(func() { setHeader = true })
+	if setHeader {
+		// This is a bit of a hack, but client to server headers are only readable after first client msg is
+		// received but must be written to server stream before the first msg is flushed.
+		// This is the only place to do it nicely.
+		md, err := stream.Header()
+		if err != nil {
+			return err
+		}
+		// Update the metadata for the response.
+		if f.metaKey != NoMeta {
+			if f.metaVal == "" {
+				// We could also always just do this
+				md.Set(f.metaKey, f.backend.name)
+			} else {
+				md.Set(f.metaKey, f.metaVal)
+			}
+		}
+		if err := r.serverStream.SendHeader(md); err != nil {
+			return err
+		}
+	}
+
+	log.Debugf("Response frame %s", hex.EncodeToString(f.payload))
+
+	return r.serverStream.SendMsg(&f)
+}
+
+func (r *request) sendAll(frame *requestFrame) error {
+	r.mutex.Lock()
+	if !r.isStreamingRequest {
+		// save frames of non-streaming requests, so we can catchup new streams
+		r.requestFrameBacklog = append(r.requestFrameBacklog, frame.payload)
+	}
+
+	// send to all existing streams
+	streams := make(map[string]grpc.ClientStream, len(r.streams))
+	for n, s := range r.streams {
+		streams[n] = s
+	}
+	r.mutex.Unlock()
+
+	var rtrn error
+	atLeastOne := false
+	atLeastOneSuccess := false
+	for _, stream := range streams {
+		if err := stream.SendMsg(frame); err != nil {
+			log.Debugf("Error on SendMsg: %s", err.Error())
+			rtrn = err
+		} else {
+			atLeastOneSuccess = true
+		}
+		atLeastOne = true
+	}
+	// If one of the streams succeeded, declare success
+	// if none did pick an error and return it.
+	if atLeastOne {
+		if atLeastOneSuccess {
+			return nil
+		} else {
+			return rtrn
+		}
+	} else {
+		err := errors.New("unable to send, all streams have closed")
+		log.Error(err)
+		return err
+	}
+}
+
+func (r *request) forwardRequestStream(src grpc.ServerStream) error {
+	// The frame buffer already has the results of a first
+	// RecvMsg in it so the first thing to do is to
+	// send it to the list of client streams and only
+	// then read some more.
+	frame := *r.requestFrame // local copy of frame
+	var rtrn error
+	for {
+		// Send the message to each of the backend streams
+		if err := r.sendAll(&frame); err != nil {
+			log.Debugf("SendAll failed %s", err.Error())
+			rtrn = err
+			break
+		}
+		log.Debugf("Request frame %s", hex.EncodeToString(frame.payload))
+		if err := src.RecvMsg(&frame); err != nil {
+			rtrn = err // this can be io.EOF which is happy case
+			break
+		}
+	}
+
+	r.mutex.Lock()
+	log.Debug("Closing southbound streams")
+	r.sendClosed = true
+	for _, stream := range r.streams {
+		stream.CloseSend()
+	}
+	r.mutex.Unlock()
+
+	if rtrn != io.EOF {
+		log.Debugf("s2cErr reporting %v", rtrn)
+		return rtrn
+	}
+	log.Debug("s2cErr reporting EOF")
+	// this is the successful case where the sender has encountered io.EOF, and won't be sending anymore.
+	// the clientStream>serverStream may continue sending though.
+	return nil
+}