initial add - go fmt on grpc

Change-Id: Ib0afadd2fe5571d1456a091f94f5644458f7d3f4
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
new file mode 100644
index 0000000..78f9ddc
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -0,0 +1,431 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// This file is the implementation of a gRPC server using HTTP/2 which
+// uses the standard Go http2 Server implementation (via the
+// http.Handler interface), rather than speaking low-level HTTP/2
+// frames itself. It is the implementation of *grpc.Server.ServeHTTP.
+
+package transport
+
+import (
+	"bytes"
+	"context"
+	"errors"
+	"fmt"
+	"io"
+	"net"
+	"net/http"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+	"golang.org/x/net/http2"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/credentials"
+	"google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/peer"
+	"google.golang.org/grpc/stats"
+	"google.golang.org/grpc/status"
+)
+
+// NewServerHandlerTransport returns a ServerTransport handling gRPC
+// from inside an http.Handler. It requires that the http Server
+// supports HTTP/2.
+func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) {
+	if r.ProtoMajor != 2 {
+		return nil, errors.New("gRPC requires HTTP/2")
+	}
+	if r.Method != "POST" {
+		return nil, errors.New("invalid gRPC request method")
+	}
+	contentType := r.Header.Get("Content-Type")
+	// TODO: do we assume contentType is lowercase? we did before
+	contentSubtype, validContentType := contentSubtype(contentType)
+	if !validContentType {
+		return nil, errors.New("invalid gRPC request content-type")
+	}
+	if _, ok := w.(http.Flusher); !ok {
+		return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
+	}
+
+	st := &serverHandlerTransport{
+		rw:             w,
+		req:            r,
+		closedCh:       make(chan struct{}),
+		writes:         make(chan func()),
+		contentType:    contentType,
+		contentSubtype: contentSubtype,
+		stats:          stats,
+	}
+
+	if v := r.Header.Get("grpc-timeout"); v != "" {
+		to, err := decodeTimeout(v)
+		if err != nil {
+			return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err)
+		}
+		st.timeoutSet = true
+		st.timeout = to
+	}
+
+	metakv := []string{"content-type", contentType}
+	if r.Host != "" {
+		metakv = append(metakv, ":authority", r.Host)
+	}
+	for k, vv := range r.Header {
+		k = strings.ToLower(k)
+		if isReservedHeader(k) && !isWhitelistedHeader(k) {
+			continue
+		}
+		for _, v := range vv {
+			v, err := decodeMetadataHeader(k, v)
+			if err != nil {
+				return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", err)
+			}
+			metakv = append(metakv, k, v)
+		}
+	}
+	st.headerMD = metadata.Pairs(metakv...)
+
+	return st, nil
+}
+
+// serverHandlerTransport is an implementation of ServerTransport
+// which replies to exactly one gRPC request (exactly one HTTP request),
+// using the net/http.Handler interface. This http.Handler is guaranteed
+// at this point to be speaking over HTTP/2, so it's able to speak valid
+// gRPC.
+type serverHandlerTransport struct {
+	rw               http.ResponseWriter
+	req              *http.Request
+	timeoutSet       bool
+	timeout          time.Duration
+	didCommonHeaders bool
+
+	headerMD metadata.MD
+
+	closeOnce sync.Once
+	closedCh  chan struct{} // closed on Close
+
+	// writes is a channel of code to run serialized in the
+	// ServeHTTP (HandleStreams) goroutine. The channel is closed
+	// when WriteStatus is called.
+	writes chan func()
+
+	// block concurrent WriteStatus calls
+	// e.g. grpc/(*serverStream).SendMsg/RecvMsg
+	writeStatusMu sync.Mutex
+
+	// we just mirror the request content-type
+	contentType string
+	// we store both contentType and contentSubtype so we don't keep recreating them
+	// TODO make sure this is consistent across handler_server and http2_server
+	contentSubtype string
+
+	stats stats.Handler
+}
+
+func (ht *serverHandlerTransport) Close() error {
+	ht.closeOnce.Do(ht.closeCloseChanOnce)
+	return nil
+}
+
+func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
+
+func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
+
+// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
+// the empty string if unknown.
+type strAddr string
+
+func (a strAddr) Network() string {
+	if a != "" {
+		// Per the documentation on net/http.Request.RemoteAddr, if this is
+		// set, it's set to the IP:port of the peer (hence, TCP):
+		// https://golang.org/pkg/net/http/#Request
+		//
+		// If we want to support Unix sockets later, we can
+		// add our own grpc-specific convention within the
+		// grpc codebase to set RemoteAddr to a different
+		// format, or probably better: we can attach it to the
+		// context and use that from serverHandlerTransport.RemoteAddr.
+		return "tcp"
+	}
+	return ""
+}
+
+func (a strAddr) String() string { return string(a) }
+
+// do runs fn in the ServeHTTP goroutine.
+func (ht *serverHandlerTransport) do(fn func()) error {
+	select {
+	case <-ht.closedCh:
+		return ErrConnClosing
+	case ht.writes <- fn:
+		return nil
+	}
+}
+
+func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
+	ht.writeStatusMu.Lock()
+	defer ht.writeStatusMu.Unlock()
+
+	err := ht.do(func() {
+		ht.writeCommonHeaders(s)
+
+		// And flush, in case no header or body has been sent yet.
+		// This forces a separation of headers and trailers if this is the
+		// first call (for example, in end2end tests's TestNoService).
+		ht.rw.(http.Flusher).Flush()
+
+		h := ht.rw.Header()
+		h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
+		if m := st.Message(); m != "" {
+			h.Set("Grpc-Message", encodeGrpcMessage(m))
+		}
+
+		if p := st.Proto(); p != nil && len(p.Details) > 0 {
+			stBytes, err := proto.Marshal(p)
+			if err != nil {
+				// TODO: return error instead, when callers are able to handle it.
+				panic(err)
+			}
+
+			h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
+		}
+
+		if md := s.Trailer(); len(md) > 0 {
+			for k, vv := range md {
+				// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
+				if isReservedHeader(k) {
+					continue
+				}
+				for _, v := range vv {
+					// http2 ResponseWriter mechanism to send undeclared Trailers after
+					// the headers have possibly been written.
+					h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v))
+				}
+			}
+		}
+	})
+
+	if err == nil { // transport has not been closed
+		if ht.stats != nil {
+			ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
+		}
+	}
+	ht.Close()
+	return err
+}
+
+// writeCommonHeaders sets common headers on the first write
+// call (Write, WriteHeader, or WriteStatus).
+func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
+	if ht.didCommonHeaders {
+		return
+	}
+	ht.didCommonHeaders = true
+
+	h := ht.rw.Header()
+	h["Date"] = nil // suppress Date to make tests happy; TODO: restore
+	h.Set("Content-Type", ht.contentType)
+
+	// Predeclare trailers we'll set later in WriteStatus (after the body).
+	// This is a SHOULD in the HTTP RFC, and the way you add (known)
+	// Trailers per the net/http.ResponseWriter contract.
+	// See https://golang.org/pkg/net/http/#ResponseWriter
+	// and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
+	h.Add("Trailer", "Grpc-Status")
+	h.Add("Trailer", "Grpc-Message")
+	h.Add("Trailer", "Grpc-Status-Details-Bin")
+
+	if s.sendCompress != "" {
+		h.Set("Grpc-Encoding", s.sendCompress)
+	}
+}
+
+func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
+	return ht.do(func() {
+		ht.writeCommonHeaders(s)
+		ht.rw.Write(hdr)
+		ht.rw.Write(data)
+		ht.rw.(http.Flusher).Flush()
+	})
+}
+
+func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
+	err := ht.do(func() {
+		ht.writeCommonHeaders(s)
+		h := ht.rw.Header()
+		for k, vv := range md {
+			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
+			if isReservedHeader(k) {
+				continue
+			}
+			for _, v := range vv {
+				v = encodeMetadataHeader(k, v)
+				h.Add(k, v)
+			}
+		}
+		ht.rw.WriteHeader(200)
+		ht.rw.(http.Flusher).Flush()
+	})
+
+	if err == nil {
+		if ht.stats != nil {
+			ht.stats.HandleRPC(s.Context(), &stats.OutHeader{})
+		}
+	}
+	return err
+}
+
+func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
+	// With this transport type there will be exactly 1 stream: this HTTP request.
+
+	ctx := ht.req.Context()
+	var cancel context.CancelFunc
+	if ht.timeoutSet {
+		ctx, cancel = context.WithTimeout(ctx, ht.timeout)
+	} else {
+		ctx, cancel = context.WithCancel(ctx)
+	}
+
+	// requestOver is closed when the status has been written via WriteStatus.
+	requestOver := make(chan struct{})
+	go func() {
+		select {
+		case <-requestOver:
+		case <-ht.closedCh:
+		case <-ht.req.Context().Done():
+		}
+		cancel()
+		ht.Close()
+	}()
+
+	req := ht.req
+
+	s := &Stream{
+		id:             0, // irrelevant
+		requestRead:    func(int) {},
+		cancel:         cancel,
+		buf:            newRecvBuffer(),
+		st:             ht,
+		method:         req.URL.Path,
+		recvCompress:   req.Header.Get("grpc-encoding"),
+		contentSubtype: ht.contentSubtype,
+	}
+	pr := &peer.Peer{
+		Addr: ht.RemoteAddr(),
+	}
+	if req.TLS != nil {
+		pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
+	}
+	ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
+	s.ctx = peer.NewContext(ctx, pr)
+	if ht.stats != nil {
+		s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
+		inHeader := &stats.InHeader{
+			FullMethod:  s.method,
+			RemoteAddr:  ht.RemoteAddr(),
+			Compression: s.recvCompress,
+		}
+		ht.stats.HandleRPC(s.ctx, inHeader)
+	}
+	s.trReader = &transportReader{
+		reader:        &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
+		windowHandler: func(int) {},
+	}
+
+	// readerDone is closed when the Body.Read-ing goroutine exits.
+	readerDone := make(chan struct{})
+	go func() {
+		defer close(readerDone)
+
+		// TODO: minimize garbage, optimize recvBuffer code/ownership
+		const readSize = 8196
+		for buf := make([]byte, readSize); ; {
+			n, err := req.Body.Read(buf)
+			if n > 0 {
+				s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
+				buf = buf[n:]
+			}
+			if err != nil {
+				s.buf.put(recvMsg{err: mapRecvMsgError(err)})
+				return
+			}
+			if len(buf) == 0 {
+				buf = make([]byte, readSize)
+			}
+		}
+	}()
+
+	// startStream is provided by the *grpc.Server's serveStreams.
+	// It starts a goroutine serving s and exits immediately.
+	// The goroutine that is started is the one that then calls
+	// into ht, calling WriteHeader, Write, WriteStatus, Close, etc.
+	startStream(s)
+
+	ht.runStream()
+	close(requestOver)
+
+	// Wait for reading goroutine to finish.
+	req.Body.Close()
+	<-readerDone
+}
+
+func (ht *serverHandlerTransport) runStream() {
+	for {
+		select {
+		case fn := <-ht.writes:
+			fn()
+		case <-ht.closedCh:
+			return
+		}
+	}
+}
+
+func (ht *serverHandlerTransport) IncrMsgSent() {}
+
+func (ht *serverHandlerTransport) IncrMsgRecv() {}
+
+func (ht *serverHandlerTransport) Drain() {
+	panic("Drain() is not implemented")
+}
+
+// mapRecvMsgError returns the non-nil err into the appropriate
+// error value as expected by callers of *grpc.parser.recvMsg.
+// In particular, in can only be:
+//   * io.EOF
+//   * io.ErrUnexpectedEOF
+//   * of type transport.ConnectionError
+//   * an error from the status package
+func mapRecvMsgError(err error) error {
+	if err == io.EOF || err == io.ErrUnexpectedEOF {
+		return err
+	}
+	if se, ok := err.(http2.StreamError); ok {
+		if code, ok := http2ErrConvTab[se.Code]; ok {
+			return status.Error(code, se.Error())
+		}
+	}
+	if strings.Contains(err.Error(), "body closed by handler") {
+		return status.Error(codes.Canceled, err.Error())
+	}
+	return connectionErrorf(true, err, err.Error())
+}