[VOL-4290] Voltha go library updates for gRPC migration
Change-Id: I1aa2774beb6b7ed7419bc45aeb53fcae8a8ecda0
diff --git a/vendor/github.com/tmc/grpc-websocket-proxy/wsproxy/websocket_proxy.go b/vendor/github.com/tmc/grpc-websocket-proxy/wsproxy/websocket_proxy.go
index 0fca05a..7092162 100644
--- a/vendor/github.com/tmc/grpc-websocket-proxy/wsproxy/websocket_proxy.go
+++ b/vendor/github.com/tmc/grpc-websocket-proxy/wsproxy/websocket_proxy.go
@@ -2,9 +2,11 @@
import (
"bufio"
+ "fmt"
"io"
"net/http"
"strings"
+ "time"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
@@ -26,11 +28,16 @@
// Proxy provides websocket transport upgrade to compatible endpoints.
type Proxy struct {
- h http.Handler
- logger Logger
- methodOverrideParam string
- tokenCookieName string
- requestMutator RequestMutatorFunc
+ h http.Handler
+ logger Logger
+ maxRespBodyBufferBytes int
+ methodOverrideParam string
+ tokenCookieName string
+ requestMutator RequestMutatorFunc
+ headerForwarder func(header string) bool
+ pingInterval time.Duration
+ pingWait time.Duration
+ pongWait time.Duration
}
// Logger collects log messages.
@@ -50,6 +57,15 @@
// Option allows customization of the proxy.
type Option func(*Proxy)
+// WithMaxRespBodyBufferSize allows specification of a custom size for the
+// buffer used while reading the response body. By default, the bufio.Scanner
+// used to read the response body sets the maximum token size to MaxScanTokenSize.
+func WithMaxRespBodyBufferSize(nBytes int) Option {
+ return func(p *Proxy) {
+ p.maxRespBodyBufferBytes = nBytes
+ }
+}
+
// WithMethodParamOverride allows specification of the special http parameter that is used in the proxied streaming request.
func WithMethodParamOverride(param string) Option {
return func(p *Proxy) {
@@ -71,6 +87,13 @@
}
}
+// WithForwardedHeaders allows controlling which headers are forwarded.
+func WithForwardedHeaders(fn func(header string) bool) Option {
+ return func(p *Proxy) {
+ p.headerForwarder = fn
+ }
+}
+
// WithLogger allows a custom FieldLogger to be supplied
func WithLogger(logger Logger) Option {
return func(p *Proxy) {
@@ -78,6 +101,28 @@
}
}
+// WithPingControl allows specification of ping pong control. The interval
+// parameter specifies the pingInterval between pings. The allowed wait time
+// for a pong response is (pingInterval * 10) / 9.
+func WithPingControl(interval time.Duration) Option {
+ return func(proxy *Proxy) {
+ proxy.pingInterval = interval
+ proxy.pongWait = (interval * 10) / 9
+ proxy.pingWait = proxy.pongWait / 6
+ }
+}
+
+var defaultHeadersToForward = map[string]bool{
+ "Origin": true,
+ "origin": true,
+ "Referer": true,
+ "referer": true,
+}
+
+func defaultHeaderForwarder(header string) bool {
+ return defaultHeadersToForward[header]
+}
+
// WebsocketProxy attempts to expose the underlying handler as a bidi websocket stream with newline-delimited
// JSON as the content encoding.
//
@@ -96,6 +141,7 @@
logger: logrus.New(),
methodOverrideParam: MethodOverrideParam,
tokenCookieName: TokenCookieName,
+ headerForwarder: defaultHeaderForwarder,
}
for _, o := range opts {
o(p)
@@ -138,13 +184,18 @@
defer cancelFn()
requestBodyR, requestBodyW := io.Pipe()
- request, err := http.NewRequest(r.Method, r.URL.String(), requestBodyR)
+ request, err := http.NewRequestWithContext(r.Context(), r.Method, r.URL.String(), requestBodyR)
if err != nil {
p.logger.Warnln("error preparing request:", err)
return
}
if swsp := r.Header.Get("Sec-WebSocket-Protocol"); swsp != "" {
- request.Header.Set("Authorization", strings.Replace(swsp, "Bearer, ", "Bearer ", 1))
+ request.Header.Set("Authorization", transformSubProtocolHeader(swsp))
+ }
+ for header := range r.Header {
+ if p.headerForwarder(header) {
+ request.Header.Set(header, r.Header.Get(header))
+ }
}
// If token cookie is present, populate Authorization header from the cookie instead.
if cookie, err := r.Cookie(p.tokenCookieName); err == nil {
@@ -175,6 +226,10 @@
// read loop -- take messages from websocket and write to http request
go func() {
+ if p.pingInterval > 0 && p.pingWait > 0 && p.pongWait > 0 {
+ conn.SetReadDeadline(time.Now().Add(p.pongWait))
+ conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(p.pongWait)); return nil })
+ }
defer func() {
cancelFn()
}()
@@ -206,8 +261,38 @@
}
}
}()
+ // ping write loop
+ if p.pingInterval > 0 && p.pingWait > 0 && p.pongWait > 0 {
+ go func() {
+ ticker := time.NewTicker(p.pingInterval)
+ defer func() {
+ ticker.Stop()
+ conn.Close()
+ }()
+ for {
+ select {
+ case <-ctx.Done():
+ p.logger.Debugln("ping loop done")
+ return
+ case <-ticker.C:
+ conn.SetWriteDeadline(time.Now().Add(p.pingWait))
+ if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
+ return
+ }
+ }
+ }
+ }()
+ }
// write loop -- take messages from response and write to websocket
scanner := bufio.NewScanner(responseBodyR)
+
+ // if maxRespBodyBufferSize has been specified, use custom buffer for scanner
+ var scannerBuf []byte
+ if p.maxRespBodyBufferBytes > 0 {
+ scannerBuf = make([]byte, 0, 64*1024)
+ scanner.Buffer(scannerBuf, p.maxRespBodyBufferBytes)
+ }
+
for scanner.Scan() {
if len(scanner.Bytes()) == 0 {
p.logger.Warnln("[write] empty scan", scanner.Err())
@@ -239,6 +324,17 @@
}
}
+// IE and Edge do not delimit Sec-WebSocket-Protocol strings with spaces
+func transformSubProtocolHeader(header string) string {
+ tokens := strings.SplitN(header, "Bearer,", 2)
+
+ if len(tokens) < 2 {
+ return ""
+ }
+
+ return fmt.Sprintf("Bearer %v", strings.Trim(tokens[1], " "))
+}
+
func (w *inMemoryResponseWriter) Write(b []byte) (int, error) {
return w.Writer.Write(b)
}