[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/gorilla/websocket/conn.go b/vendor/github.com/gorilla/websocket/conn.go
index 97e1dba..ca46d2f 100644
--- a/vendor/github.com/gorilla/websocket/conn.go
+++ b/vendor/github.com/gorilla/websocket/conn.go
@@ -76,7 +76,7 @@
 	// is UTF-8 encoded text.
 	PingMessage = 9
 
-	// PongMessage denotes a ping control message. The optional message payload
+	// PongMessage denotes a pong control message. The optional message payload
 	// is UTF-8 encoded text.
 	PongMessage = 10
 )
@@ -100,9 +100,8 @@
 func (e *netError) Temporary() bool { return e.temporary }
 func (e *netError) Timeout() bool   { return e.timeout }
 
-// CloseError represents close frame.
+// CloseError represents a close message.
 type CloseError struct {
-
 	// Code is defined in RFC 6455, section 11.7.
 	Code int
 
@@ -224,6 +223,20 @@
 	return validReceivedCloseCodes[code] || (code >= 3000 && code <= 4999)
 }
 
+// BufferPool represents a pool of buffers. The *sync.Pool type satisfies this
+// interface.  The type of the value stored in a pool is not specified.
+type BufferPool interface {
+	// Get gets a value from the pool or returns nil if the pool is empty.
+	Get() interface{}
+	// Put adds a value to the pool.
+	Put(interface{})
+}
+
+// writePoolData is the type added to the write buffer pool. This wrapper is
+// used to prevent applications from peeking at and depending on the values
+// added to the pool.
+type writePoolData struct{ buf []byte }
+
 // The Conn type represents a WebSocket connection.
 type Conn struct {
 	conn        net.Conn
@@ -231,8 +244,10 @@
 	subprotocol string
 
 	// Write fields
-	mu            chan bool // used as mutex to protect write to conn
-	writeBuf      []byte    // frame is constructed in this buffer.
+	mu            chan struct{} // used as mutex to protect write to conn
+	writeBuf      []byte        // frame is constructed in this buffer.
+	writePool     BufferPool
+	writeBufSize  int
 	writeDeadline time.Time
 	writer        io.WriteCloser // the current writer returned to the application
 	isWriting     bool           // for best-effort concurrent write detection
@@ -245,10 +260,12 @@
 	newCompressionWriter   func(io.WriteCloser, int) io.WriteCloser
 
 	// Read fields
-	reader        io.ReadCloser // the current reader returned to the application
-	readErr       error
-	br            *bufio.Reader
-	readRemaining int64 // bytes remaining in current frame.
+	reader  io.ReadCloser // the current reader returned to the application
+	readErr error
+	br      *bufio.Reader
+	// bytes remaining in current frame.
+	// set setReadRemaining to safely update this value and prevent overflow
+	readRemaining int64
 	readFinal     bool  // true the current message has more frames.
 	readLength    int64 // Message size.
 	readLimit     int64 // Maximum message size.
@@ -264,64 +281,29 @@
 	newDecompressionReader func(io.Reader) io.ReadCloser
 }
 
-func newConn(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int) *Conn {
-	return newConnBRW(conn, isServer, readBufferSize, writeBufferSize, nil)
-}
+func newConn(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int, writeBufferPool BufferPool, br *bufio.Reader, writeBuf []byte) *Conn {
 
-type writeHook struct {
-	p []byte
-}
-
-func (wh *writeHook) Write(p []byte) (int, error) {
-	wh.p = p
-	return len(p), nil
-}
-
-func newConnBRW(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int, brw *bufio.ReadWriter) *Conn {
-	mu := make(chan bool, 1)
-	mu <- true
-
-	var br *bufio.Reader
-	if readBufferSize == 0 && brw != nil && brw.Reader != nil {
-		// Reuse the supplied bufio.Reader if the buffer has a useful size.
-		// This code assumes that peek on a reader returns
-		// bufio.Reader.buf[:0].
-		brw.Reader.Reset(conn)
-		if p, err := brw.Reader.Peek(0); err == nil && cap(p) >= 256 {
-			br = brw.Reader
-		}
-	}
 	if br == nil {
 		if readBufferSize == 0 {
 			readBufferSize = defaultReadBufferSize
-		}
-		if readBufferSize < maxControlFramePayloadSize {
+		} else if readBufferSize < maxControlFramePayloadSize {
+			// must be large enough for control frame
 			readBufferSize = maxControlFramePayloadSize
 		}
 		br = bufio.NewReaderSize(conn, readBufferSize)
 	}
 
-	var writeBuf []byte
-	if writeBufferSize == 0 && brw != nil && brw.Writer != nil {
-		// Use the bufio.Writer's buffer if the buffer has a useful size. This
-		// code assumes that bufio.Writer.buf[:1] is passed to the
-		// bufio.Writer's underlying writer.
-		var wh writeHook
-		brw.Writer.Reset(&wh)
-		brw.Writer.WriteByte(0)
-		brw.Flush()
-		if cap(wh.p) >= maxFrameHeaderSize+256 {
-			writeBuf = wh.p[:cap(wh.p)]
-		}
+	if writeBufferSize <= 0 {
+		writeBufferSize = defaultWriteBufferSize
+	}
+	writeBufferSize += maxFrameHeaderSize
+
+	if writeBuf == nil && writeBufferPool == nil {
+		writeBuf = make([]byte, writeBufferSize)
 	}
 
-	if writeBuf == nil {
-		if writeBufferSize == 0 {
-			writeBufferSize = defaultWriteBufferSize
-		}
-		writeBuf = make([]byte, writeBufferSize+maxFrameHeaderSize)
-	}
-
+	mu := make(chan struct{}, 1)
+	mu <- struct{}{}
 	c := &Conn{
 		isServer:               isServer,
 		br:                     br,
@@ -329,6 +311,8 @@
 		mu:                     mu,
 		readFinal:              true,
 		writeBuf:               writeBuf,
+		writePool:              writeBufferPool,
+		writeBufSize:           writeBufferSize,
 		enableWriteCompression: true,
 		compressionLevel:       defaultCompressionLevel,
 	}
@@ -338,12 +322,24 @@
 	return c
 }
 
+// setReadRemaining tracks the number of bytes remaining on the connection. If n
+// overflows, an ErrReadLimit is returned.
+func (c *Conn) setReadRemaining(n int64) error {
+	if n < 0 {
+		return ErrReadLimit
+	}
+
+	c.readRemaining = n
+	return nil
+}
+
 // Subprotocol returns the negotiated protocol for the connection.
 func (c *Conn) Subprotocol() string {
 	return c.subprotocol
 }
 
-// Close closes the underlying network connection without sending or waiting for a close frame.
+// Close closes the underlying network connection without sending or waiting
+// for a close message.
 func (c *Conn) Close() error {
 	return c.conn.Close()
 }
@@ -370,9 +366,18 @@
 	return err
 }
 
-func (c *Conn) write(frameType int, deadline time.Time, bufs ...[]byte) error {
+func (c *Conn) read(n int) ([]byte, error) {
+	p, err := c.br.Peek(n)
+	if err == io.EOF {
+		err = errUnexpectedEOF
+	}
+	c.br.Discard(len(p))
+	return p, err
+}
+
+func (c *Conn) write(frameType int, deadline time.Time, buf0, buf1 []byte) error {
 	<-c.mu
-	defer func() { c.mu <- true }()
+	defer func() { c.mu <- struct{}{} }()
 
 	c.writeErrMu.Lock()
 	err := c.writeErr
@@ -382,15 +387,14 @@
 	}
 
 	c.conn.SetWriteDeadline(deadline)
-	for _, buf := range bufs {
-		if len(buf) > 0 {
-			_, err := c.conn.Write(buf)
-			if err != nil {
-				return c.writeFatal(err)
-			}
-		}
+	if len(buf1) == 0 {
+		_, err = c.conn.Write(buf0)
+	} else {
+		err = c.writeBufs(buf0, buf1)
 	}
-
+	if err != nil {
+		return c.writeFatal(err)
+	}
 	if frameType == CloseMessage {
 		c.writeFatal(ErrCloseSent)
 	}
@@ -425,7 +429,7 @@
 		maskBytes(key, 0, buf[6:])
 	}
 
-	d := time.Hour * 1000
+	d := 1000 * time.Hour
 	if !deadline.IsZero() {
 		d = deadline.Sub(time.Now())
 		if d < 0 {
@@ -440,7 +444,7 @@
 	case <-timer.C:
 		return errWriteTimeout
 	}
-	defer func() { c.mu <- true }()
+	defer func() { c.mu <- struct{}{} }()
 
 	c.writeErrMu.Lock()
 	err := c.writeErr
@@ -460,7 +464,8 @@
 	return err
 }
 
-func (c *Conn) prepWrite(messageType int) error {
+// beginMessage prepares a connection and message writer for a new message.
+func (c *Conn) beginMessage(mw *messageWriter, messageType int) error {
 	// Close previous writer if not already closed by the application. It's
 	// probably better to return an error in this situation, but we cannot
 	// change this without breaking existing applications.
@@ -476,7 +481,23 @@
 	c.writeErrMu.Lock()
 	err := c.writeErr
 	c.writeErrMu.Unlock()
-	return err
+	if err != nil {
+		return err
+	}
+
+	mw.c = c
+	mw.frameType = messageType
+	mw.pos = maxFrameHeaderSize
+
+	if c.writeBuf == nil {
+		wpd, ok := c.writePool.Get().(writePoolData)
+		if ok {
+			c.writeBuf = wpd.buf
+		} else {
+			c.writeBuf = make([]byte, c.writeBufSize)
+		}
+	}
+	return nil
 }
 
 // NextWriter returns a writer for the next message to send. The writer's Close
@@ -484,17 +505,15 @@
 //
 // There can be at most one open writer on a connection. NextWriter closes the
 // previous writer if the application has not already done so.
+//
+// All message types (TextMessage, BinaryMessage, CloseMessage, PingMessage and
+// PongMessage) are supported.
 func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) {
-	if err := c.prepWrite(messageType); err != nil {
+	var mw messageWriter
+	if err := c.beginMessage(&mw, messageType); err != nil {
 		return nil, err
 	}
-
-	mw := &messageWriter{
-		c:         c,
-		frameType: messageType,
-		pos:       maxFrameHeaderSize,
-	}
-	c.writer = mw
+	c.writer = &mw
 	if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) {
 		w := c.newCompressionWriter(c.writer, c.compressionLevel)
 		mw.compress = true
@@ -511,10 +530,16 @@
 	err       error
 }
 
-func (w *messageWriter) fatal(err error) error {
+func (w *messageWriter) endMessage(err error) error {
 	if w.err != nil {
-		w.err = err
-		w.c.writer = nil
+		return err
+	}
+	c := w.c
+	w.err = err
+	c.writer = nil
+	if c.writePool != nil {
+		c.writePool.Put(writePoolData{buf: c.writeBuf})
+		c.writeBuf = nil
 	}
 	return err
 }
@@ -528,7 +553,7 @@
 	// Check for invalid control frames.
 	if isControl(w.frameType) &&
 		(!final || length > maxControlFramePayloadSize) {
-		return w.fatal(errInvalidControlFrame)
+		return w.endMessage(errInvalidControlFrame)
 	}
 
 	b0 := byte(w.frameType)
@@ -573,7 +598,7 @@
 		copy(c.writeBuf[maxFrameHeaderSize-4:], key[:])
 		maskBytes(key, 0, c.writeBuf[maxFrameHeaderSize:w.pos])
 		if len(extra) > 0 {
-			return c.writeFatal(errors.New("websocket: internal error, extra used in client mode"))
+			return w.endMessage(c.writeFatal(errors.New("websocket: internal error, extra used in client mode")))
 		}
 	}
 
@@ -594,11 +619,11 @@
 	c.isWriting = false
 
 	if err != nil {
-		return w.fatal(err)
+		return w.endMessage(err)
 	}
 
 	if final {
-		c.writer = nil
+		w.endMessage(errWriteClosed)
 		return nil
 	}
 
@@ -696,11 +721,7 @@
 	if w.err != nil {
 		return w.err
 	}
-	if err := w.flushFrame(true, nil); err != nil {
-		return err
-	}
-	w.err = errWriteClosed
-	return nil
+	return w.flushFrame(true, nil)
 }
 
 // WritePreparedMessage writes prepared message into connection.
@@ -732,10 +753,10 @@
 	if c.isServer && (c.newCompressionWriter == nil || !c.enableWriteCompression) {
 		// Fast path with no allocations and single frame.
 
-		if err := c.prepWrite(messageType); err != nil {
+		var mw messageWriter
+		if err := c.beginMessage(&mw, messageType); err != nil {
 			return err
 		}
-		mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize}
 		n := copy(c.writeBuf[mw.pos:], data)
 		mw.pos += n
 		data = data[n:]
@@ -764,7 +785,6 @@
 // Read methods
 
 func (c *Conn) advanceFrame() (int, error) {
-
 	// 1. Skip remainder of previous frame.
 
 	if c.readRemaining > 0 {
@@ -783,7 +803,7 @@
 	final := p[0]&finalBit != 0
 	frameType := int(p[0] & 0xf)
 	mask := p[1]&maskBit != 0
-	c.readRemaining = int64(p[1] & 0x7f)
+	c.setReadRemaining(int64(p[1] & 0x7f))
 
 	c.readDecompress = false
 	if c.newDecompressionReader != nil && (p[0]&rsv1Bit) != 0 {
@@ -817,7 +837,17 @@
 		return noFrame, c.handleProtocolError("unknown opcode " + strconv.Itoa(frameType))
 	}
 
-	// 3. Read and parse frame length.
+	// 3. Read and parse frame length as per
+	// https://tools.ietf.org/html/rfc6455#section-5.2
+	//
+	// The length of the "Payload data", in bytes: if 0-125, that is the payload
+	// length.
+	// - If 126, the following 2 bytes interpreted as a 16-bit unsigned
+	// integer are the payload length.
+	// - If 127, the following 8 bytes interpreted as
+	// a 64-bit unsigned integer (the most significant bit MUST be 0) are the
+	// payload length. Multibyte length quantities are expressed in network byte
+	// order.
 
 	switch c.readRemaining {
 	case 126:
@@ -825,13 +855,19 @@
 		if err != nil {
 			return noFrame, err
 		}
-		c.readRemaining = int64(binary.BigEndian.Uint16(p))
+
+		if err := c.setReadRemaining(int64(binary.BigEndian.Uint16(p))); err != nil {
+			return noFrame, err
+		}
 	case 127:
 		p, err := c.read(8)
 		if err != nil {
 			return noFrame, err
 		}
-		c.readRemaining = int64(binary.BigEndian.Uint64(p))
+
+		if err := c.setReadRemaining(int64(binary.BigEndian.Uint64(p))); err != nil {
+			return noFrame, err
+		}
 	}
 
 	// 4. Handle frame masking.
@@ -854,6 +890,12 @@
 	if frameType == continuationFrame || frameType == TextMessage || frameType == BinaryMessage {
 
 		c.readLength += c.readRemaining
+		// Don't allow readLength to overflow in the presence of a large readRemaining
+		// counter.
+		if c.readLength < 0 {
+			return noFrame, ErrReadLimit
+		}
+
 		if c.readLimit > 0 && c.readLength > c.readLimit {
 			c.WriteControl(CloseMessage, FormatCloseMessage(CloseMessageTooBig, ""), time.Now().Add(writeWait))
 			return noFrame, ErrReadLimit
@@ -867,7 +909,7 @@
 	var payload []byte
 	if c.readRemaining > 0 {
 		payload, err = c.read(int(c.readRemaining))
-		c.readRemaining = 0
+		c.setReadRemaining(0)
 		if err != nil {
 			return noFrame, err
 		}
@@ -940,6 +982,7 @@
 			c.readErr = hideTempErr(err)
 			break
 		}
+
 		if frameType == TextMessage || frameType == BinaryMessage {
 			c.messageReader = &messageReader{c}
 			c.reader = c.messageReader
@@ -980,7 +1023,9 @@
 			if c.isServer {
 				c.readMaskPos = maskBytes(c.readMaskKey, c.readMaskPos, b[:n])
 			}
-			c.readRemaining -= int64(n)
+			rem := c.readRemaining
+			rem -= int64(n)
+			c.setReadRemaining(rem)
 			if c.readRemaining > 0 && c.readErr == io.EOF {
 				c.readErr = errUnexpectedEOF
 			}
@@ -1032,8 +1077,8 @@
 	return c.conn.SetReadDeadline(t)
 }
 
-// SetReadLimit sets the maximum size for a message read from the peer. If a
-// message exceeds the limit, the connection sends a close frame to the peer
+// SetReadLimit sets the maximum size in bytes for a message read from the peer. If a
+// message exceeds the limit, the connection sends a close message to the peer
 // and returns ErrReadLimit to the application.
 func (c *Conn) SetReadLimit(limit int64) {
 	c.readLimit = limit
@@ -1046,24 +1091,22 @@
 
 // SetCloseHandler sets the handler for close messages received from the peer.
 // The code argument to h is the received close code or CloseNoStatusReceived
-// if the close message is empty. The default close handler sends a close frame
-// back to the peer.
+// if the close message is empty. The default close handler sends a close
+// message back to the peer.
 //
-// The application must read the connection to process close messages as
-// described in the section on Control Frames above.
+// The handler function is called from the NextReader, ReadMessage and message
+// reader Read methods. The application must read the connection to process
+// close messages as described in the section on Control Messages above.
 //
-// The connection read methods return a CloseError when a close frame is
+// The connection read methods return a CloseError when a close message is
 // received. Most applications should handle close messages as part of their
 // normal error handling. Applications should only set a close handler when the
-// application must perform some action before sending a close frame back to
+// application must perform some action before sending a close message back to
 // the peer.
 func (c *Conn) SetCloseHandler(h func(code int, text string) error) {
 	if h == nil {
 		h = func(code int, text string) error {
-			message := []byte{}
-			if code != CloseNoStatusReceived {
-				message = FormatCloseMessage(code, "")
-			}
+			message := FormatCloseMessage(code, "")
 			c.WriteControl(CloseMessage, message, time.Now().Add(writeWait))
 			return nil
 		}
@@ -1077,11 +1120,12 @@
 }
 
 // SetPingHandler sets the handler for ping messages received from the peer.
-// The appData argument to h is the PING frame application data. The default
+// The appData argument to h is the PING message application data. The default
 // ping handler sends a pong to the peer.
 //
-// The application must read the connection to process ping messages as
-// described in the section on Control Frames above.
+// The handler function is called from the NextReader, ReadMessage and message
+// reader Read methods. The application must read the connection to process
+// ping messages as described in the section on Control Messages above.
 func (c *Conn) SetPingHandler(h func(appData string) error) {
 	if h == nil {
 		h = func(message string) error {
@@ -1103,11 +1147,12 @@
 }
 
 // SetPongHandler sets the handler for pong messages received from the peer.
-// The appData argument to h is the PONG frame application data. The default
+// The appData argument to h is the PONG message application data. The default
 // pong handler does nothing.
 //
-// The application must read the connection to process ping messages as
-// described in the section on Control Frames above.
+// The handler function is called from the NextReader, ReadMessage and message
+// reader Read methods. The application must read the connection to process
+// pong messages as described in the section on Control Messages above.
 func (c *Conn) SetPongHandler(h func(appData string) error) {
 	if h == nil {
 		h = func(string) error { return nil }
@@ -1141,7 +1186,14 @@
 }
 
 // FormatCloseMessage formats closeCode and text as a WebSocket close message.
+// An empty message is returned for code CloseNoStatusReceived.
 func FormatCloseMessage(closeCode int, text string) []byte {
+	if closeCode == CloseNoStatusReceived {
+		// Return empty message because it's illegal to send
+		// CloseNoStatusReceived. Return non-nil value in case application
+		// checks for nil.
+		return []byte{}
+	}
 	buf := make([]byte, 2+len(text))
 	binary.BigEndian.PutUint16(buf, uint16(closeCode))
 	copy(buf[2:], text)