[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/tmc/grpc-websocket-proxy/wsproxy/websocket_proxy.go b/vendor/github.com/tmc/grpc-websocket-proxy/wsproxy/websocket_proxy.go
index 0fca05a..7092162 100644
--- a/vendor/github.com/tmc/grpc-websocket-proxy/wsproxy/websocket_proxy.go
+++ b/vendor/github.com/tmc/grpc-websocket-proxy/wsproxy/websocket_proxy.go
@@ -2,9 +2,11 @@
 
 import (
 	"bufio"
+	"fmt"
 	"io"
 	"net/http"
 	"strings"
+	"time"
 
 	"github.com/gorilla/websocket"
 	"github.com/sirupsen/logrus"
@@ -26,11 +28,16 @@
 
 // Proxy provides websocket transport upgrade to compatible endpoints.
 type Proxy struct {
-	h                   http.Handler
-	logger              Logger
-	methodOverrideParam string
-	tokenCookieName     string
-	requestMutator      RequestMutatorFunc
+	h                      http.Handler
+	logger                 Logger
+	maxRespBodyBufferBytes int
+	methodOverrideParam    string
+	tokenCookieName        string
+	requestMutator         RequestMutatorFunc
+	headerForwarder        func(header string) bool
+	pingInterval           time.Duration
+	pingWait               time.Duration
+	pongWait               time.Duration
 }
 
 // Logger collects log messages.
@@ -50,6 +57,15 @@
 // Option allows customization of the proxy.
 type Option func(*Proxy)
 
+// WithMaxRespBodyBufferSize allows specification of a custom size for the
+// buffer used while reading the response body. By default, the bufio.Scanner
+// used to read the response body sets the maximum token size to MaxScanTokenSize.
+func WithMaxRespBodyBufferSize(nBytes int) Option {
+	return func(p *Proxy) {
+		p.maxRespBodyBufferBytes = nBytes
+	}
+}
+
 // WithMethodParamOverride allows specification of the special http parameter that is used in the proxied streaming request.
 func WithMethodParamOverride(param string) Option {
 	return func(p *Proxy) {
@@ -71,6 +87,13 @@
 	}
 }
 
+// WithForwardedHeaders allows controlling which headers are forwarded.
+func WithForwardedHeaders(fn func(header string) bool) Option {
+	return func(p *Proxy) {
+		p.headerForwarder = fn
+	}
+}
+
 // WithLogger allows a custom FieldLogger to be supplied
 func WithLogger(logger Logger) Option {
 	return func(p *Proxy) {
@@ -78,6 +101,28 @@
 	}
 }
 
+// WithPingControl allows specification of ping pong control. The interval
+// parameter specifies the pingInterval between pings. The allowed wait time
+// for a pong response is (pingInterval * 10) / 9.
+func WithPingControl(interval time.Duration) Option {
+	return func(proxy *Proxy) {
+		proxy.pingInterval = interval
+		proxy.pongWait = (interval * 10) / 9
+		proxy.pingWait = proxy.pongWait / 6
+	}
+}
+
+var defaultHeadersToForward = map[string]bool{
+	"Origin":  true,
+	"origin":  true,
+	"Referer": true,
+	"referer": true,
+}
+
+func defaultHeaderForwarder(header string) bool {
+	return defaultHeadersToForward[header]
+}
+
 // WebsocketProxy attempts to expose the underlying handler as a bidi websocket stream with newline-delimited
 // JSON as the content encoding.
 //
@@ -96,6 +141,7 @@
 		logger:              logrus.New(),
 		methodOverrideParam: MethodOverrideParam,
 		tokenCookieName:     TokenCookieName,
+		headerForwarder:     defaultHeaderForwarder,
 	}
 	for _, o := range opts {
 		o(p)
@@ -138,13 +184,18 @@
 	defer cancelFn()
 
 	requestBodyR, requestBodyW := io.Pipe()
-	request, err := http.NewRequest(r.Method, r.URL.String(), requestBodyR)
+	request, err := http.NewRequestWithContext(r.Context(), r.Method, r.URL.String(), requestBodyR)
 	if err != nil {
 		p.logger.Warnln("error preparing request:", err)
 		return
 	}
 	if swsp := r.Header.Get("Sec-WebSocket-Protocol"); swsp != "" {
-		request.Header.Set("Authorization", strings.Replace(swsp, "Bearer, ", "Bearer ", 1))
+		request.Header.Set("Authorization", transformSubProtocolHeader(swsp))
+	}
+	for header := range r.Header {
+		if p.headerForwarder(header) {
+			request.Header.Set(header, r.Header.Get(header))
+		}
 	}
 	// If token cookie is present, populate Authorization header from the cookie instead.
 	if cookie, err := r.Cookie(p.tokenCookieName); err == nil {
@@ -175,6 +226,10 @@
 
 	// read loop -- take messages from websocket and write to http request
 	go func() {
+		if p.pingInterval > 0 && p.pingWait > 0 && p.pongWait > 0 {
+			conn.SetReadDeadline(time.Now().Add(p.pongWait))
+			conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(p.pongWait)); return nil })
+		}
 		defer func() {
 			cancelFn()
 		}()
@@ -206,8 +261,38 @@
 			}
 		}
 	}()
+	// ping write loop
+	if p.pingInterval > 0 && p.pingWait > 0 && p.pongWait > 0 {
+		go func() {
+			ticker := time.NewTicker(p.pingInterval)
+			defer func() {
+				ticker.Stop()
+				conn.Close()
+			}()
+			for {
+				select {
+				case <-ctx.Done():
+					p.logger.Debugln("ping loop done")
+					return
+				case <-ticker.C:
+					conn.SetWriteDeadline(time.Now().Add(p.pingWait))
+					if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
+						return
+					}
+				}
+			}
+		}()
+	}
 	// write loop -- take messages from response and write to websocket
 	scanner := bufio.NewScanner(responseBodyR)
+
+	// if maxRespBodyBufferSize has been specified, use custom buffer for scanner
+	var scannerBuf []byte
+	if p.maxRespBodyBufferBytes > 0 {
+		scannerBuf = make([]byte, 0, 64*1024)
+		scanner.Buffer(scannerBuf, p.maxRespBodyBufferBytes)
+	}
+
 	for scanner.Scan() {
 		if len(scanner.Bytes()) == 0 {
 			p.logger.Warnln("[write] empty scan", scanner.Err())
@@ -239,6 +324,17 @@
 	}
 }
 
+// IE and Edge do not delimit Sec-WebSocket-Protocol strings with spaces
+func transformSubProtocolHeader(header string) string {
+	tokens := strings.SplitN(header, "Bearer,", 2)
+
+	if len(tokens) < 2 {
+		return ""
+	}
+
+	return fmt.Sprintf("Bearer %v", strings.Trim(tokens[1], " "))
+}
+
 func (w *inMemoryResponseWriter) Write(b []byte) (int, error) {
 	return w.Writer.Write(b)
 }