[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/gorilla/websocket/conn.go b/vendor/github.com/gorilla/websocket/conn.go
index 97e1dba..ca46d2f 100644
--- a/vendor/github.com/gorilla/websocket/conn.go
+++ b/vendor/github.com/gorilla/websocket/conn.go
@@ -76,7 +76,7 @@
// is UTF-8 encoded text.
PingMessage = 9
- // PongMessage denotes a ping control message. The optional message payload
+ // PongMessage denotes a pong control message. The optional message payload
// is UTF-8 encoded text.
PongMessage = 10
)
@@ -100,9 +100,8 @@
func (e *netError) Temporary() bool { return e.temporary }
func (e *netError) Timeout() bool { return e.timeout }
-// CloseError represents close frame.
+// CloseError represents a close message.
type CloseError struct {
-
// Code is defined in RFC 6455, section 11.7.
Code int
@@ -224,6 +223,20 @@
return validReceivedCloseCodes[code] || (code >= 3000 && code <= 4999)
}
+// BufferPool represents a pool of buffers. The *sync.Pool type satisfies this
+// interface. The type of the value stored in a pool is not specified.
+type BufferPool interface {
+ // Get gets a value from the pool or returns nil if the pool is empty.
+ Get() interface{}
+ // Put adds a value to the pool.
+ Put(interface{})
+}
+
+// writePoolData is the type added to the write buffer pool. This wrapper is
+// used to prevent applications from peeking at and depending on the values
+// added to the pool.
+type writePoolData struct{ buf []byte }
+
// The Conn type represents a WebSocket connection.
type Conn struct {
conn net.Conn
@@ -231,8 +244,10 @@
subprotocol string
// Write fields
- mu chan bool // used as mutex to protect write to conn
- writeBuf []byte // frame is constructed in this buffer.
+ mu chan struct{} // used as mutex to protect write to conn
+ writeBuf []byte // frame is constructed in this buffer.
+ writePool BufferPool
+ writeBufSize int
writeDeadline time.Time
writer io.WriteCloser // the current writer returned to the application
isWriting bool // for best-effort concurrent write detection
@@ -245,10 +260,12 @@
newCompressionWriter func(io.WriteCloser, int) io.WriteCloser
// Read fields
- reader io.ReadCloser // the current reader returned to the application
- readErr error
- br *bufio.Reader
- readRemaining int64 // bytes remaining in current frame.
+ reader io.ReadCloser // the current reader returned to the application
+ readErr error
+ br *bufio.Reader
+ // bytes remaining in current frame.
+ // set setReadRemaining to safely update this value and prevent overflow
+ readRemaining int64
readFinal bool // true the current message has more frames.
readLength int64 // Message size.
readLimit int64 // Maximum message size.
@@ -264,64 +281,29 @@
newDecompressionReader func(io.Reader) io.ReadCloser
}
-func newConn(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int) *Conn {
- return newConnBRW(conn, isServer, readBufferSize, writeBufferSize, nil)
-}
+func newConn(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int, writeBufferPool BufferPool, br *bufio.Reader, writeBuf []byte) *Conn {
-type writeHook struct {
- p []byte
-}
-
-func (wh *writeHook) Write(p []byte) (int, error) {
- wh.p = p
- return len(p), nil
-}
-
-func newConnBRW(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int, brw *bufio.ReadWriter) *Conn {
- mu := make(chan bool, 1)
- mu <- true
-
- var br *bufio.Reader
- if readBufferSize == 0 && brw != nil && brw.Reader != nil {
- // Reuse the supplied bufio.Reader if the buffer has a useful size.
- // This code assumes that peek on a reader returns
- // bufio.Reader.buf[:0].
- brw.Reader.Reset(conn)
- if p, err := brw.Reader.Peek(0); err == nil && cap(p) >= 256 {
- br = brw.Reader
- }
- }
if br == nil {
if readBufferSize == 0 {
readBufferSize = defaultReadBufferSize
- }
- if readBufferSize < maxControlFramePayloadSize {
+ } else if readBufferSize < maxControlFramePayloadSize {
+ // must be large enough for control frame
readBufferSize = maxControlFramePayloadSize
}
br = bufio.NewReaderSize(conn, readBufferSize)
}
- var writeBuf []byte
- if writeBufferSize == 0 && brw != nil && brw.Writer != nil {
- // Use the bufio.Writer's buffer if the buffer has a useful size. This
- // code assumes that bufio.Writer.buf[:1] is passed to the
- // bufio.Writer's underlying writer.
- var wh writeHook
- brw.Writer.Reset(&wh)
- brw.Writer.WriteByte(0)
- brw.Flush()
- if cap(wh.p) >= maxFrameHeaderSize+256 {
- writeBuf = wh.p[:cap(wh.p)]
- }
+ if writeBufferSize <= 0 {
+ writeBufferSize = defaultWriteBufferSize
+ }
+ writeBufferSize += maxFrameHeaderSize
+
+ if writeBuf == nil && writeBufferPool == nil {
+ writeBuf = make([]byte, writeBufferSize)
}
- if writeBuf == nil {
- if writeBufferSize == 0 {
- writeBufferSize = defaultWriteBufferSize
- }
- writeBuf = make([]byte, writeBufferSize+maxFrameHeaderSize)
- }
-
+ mu := make(chan struct{}, 1)
+ mu <- struct{}{}
c := &Conn{
isServer: isServer,
br: br,
@@ -329,6 +311,8 @@
mu: mu,
readFinal: true,
writeBuf: writeBuf,
+ writePool: writeBufferPool,
+ writeBufSize: writeBufferSize,
enableWriteCompression: true,
compressionLevel: defaultCompressionLevel,
}
@@ -338,12 +322,24 @@
return c
}
+// setReadRemaining tracks the number of bytes remaining on the connection. If n
+// overflows, an ErrReadLimit is returned.
+func (c *Conn) setReadRemaining(n int64) error {
+ if n < 0 {
+ return ErrReadLimit
+ }
+
+ c.readRemaining = n
+ return nil
+}
+
// Subprotocol returns the negotiated protocol for the connection.
func (c *Conn) Subprotocol() string {
return c.subprotocol
}
-// Close closes the underlying network connection without sending or waiting for a close frame.
+// Close closes the underlying network connection without sending or waiting
+// for a close message.
func (c *Conn) Close() error {
return c.conn.Close()
}
@@ -370,9 +366,18 @@
return err
}
-func (c *Conn) write(frameType int, deadline time.Time, bufs ...[]byte) error {
+func (c *Conn) read(n int) ([]byte, error) {
+ p, err := c.br.Peek(n)
+ if err == io.EOF {
+ err = errUnexpectedEOF
+ }
+ c.br.Discard(len(p))
+ return p, err
+}
+
+func (c *Conn) write(frameType int, deadline time.Time, buf0, buf1 []byte) error {
<-c.mu
- defer func() { c.mu <- true }()
+ defer func() { c.mu <- struct{}{} }()
c.writeErrMu.Lock()
err := c.writeErr
@@ -382,15 +387,14 @@
}
c.conn.SetWriteDeadline(deadline)
- for _, buf := range bufs {
- if len(buf) > 0 {
- _, err := c.conn.Write(buf)
- if err != nil {
- return c.writeFatal(err)
- }
- }
+ if len(buf1) == 0 {
+ _, err = c.conn.Write(buf0)
+ } else {
+ err = c.writeBufs(buf0, buf1)
}
-
+ if err != nil {
+ return c.writeFatal(err)
+ }
if frameType == CloseMessage {
c.writeFatal(ErrCloseSent)
}
@@ -425,7 +429,7 @@
maskBytes(key, 0, buf[6:])
}
- d := time.Hour * 1000
+ d := 1000 * time.Hour
if !deadline.IsZero() {
d = deadline.Sub(time.Now())
if d < 0 {
@@ -440,7 +444,7 @@
case <-timer.C:
return errWriteTimeout
}
- defer func() { c.mu <- true }()
+ defer func() { c.mu <- struct{}{} }()
c.writeErrMu.Lock()
err := c.writeErr
@@ -460,7 +464,8 @@
return err
}
-func (c *Conn) prepWrite(messageType int) error {
+// beginMessage prepares a connection and message writer for a new message.
+func (c *Conn) beginMessage(mw *messageWriter, messageType int) error {
// Close previous writer if not already closed by the application. It's
// probably better to return an error in this situation, but we cannot
// change this without breaking existing applications.
@@ -476,7 +481,23 @@
c.writeErrMu.Lock()
err := c.writeErr
c.writeErrMu.Unlock()
- return err
+ if err != nil {
+ return err
+ }
+
+ mw.c = c
+ mw.frameType = messageType
+ mw.pos = maxFrameHeaderSize
+
+ if c.writeBuf == nil {
+ wpd, ok := c.writePool.Get().(writePoolData)
+ if ok {
+ c.writeBuf = wpd.buf
+ } else {
+ c.writeBuf = make([]byte, c.writeBufSize)
+ }
+ }
+ return nil
}
// NextWriter returns a writer for the next message to send. The writer's Close
@@ -484,17 +505,15 @@
//
// There can be at most one open writer on a connection. NextWriter closes the
// previous writer if the application has not already done so.
+//
+// All message types (TextMessage, BinaryMessage, CloseMessage, PingMessage and
+// PongMessage) are supported.
func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) {
- if err := c.prepWrite(messageType); err != nil {
+ var mw messageWriter
+ if err := c.beginMessage(&mw, messageType); err != nil {
return nil, err
}
-
- mw := &messageWriter{
- c: c,
- frameType: messageType,
- pos: maxFrameHeaderSize,
- }
- c.writer = mw
+ c.writer = &mw
if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) {
w := c.newCompressionWriter(c.writer, c.compressionLevel)
mw.compress = true
@@ -511,10 +530,16 @@
err error
}
-func (w *messageWriter) fatal(err error) error {
+func (w *messageWriter) endMessage(err error) error {
if w.err != nil {
- w.err = err
- w.c.writer = nil
+ return err
+ }
+ c := w.c
+ w.err = err
+ c.writer = nil
+ if c.writePool != nil {
+ c.writePool.Put(writePoolData{buf: c.writeBuf})
+ c.writeBuf = nil
}
return err
}
@@ -528,7 +553,7 @@
// Check for invalid control frames.
if isControl(w.frameType) &&
(!final || length > maxControlFramePayloadSize) {
- return w.fatal(errInvalidControlFrame)
+ return w.endMessage(errInvalidControlFrame)
}
b0 := byte(w.frameType)
@@ -573,7 +598,7 @@
copy(c.writeBuf[maxFrameHeaderSize-4:], key[:])
maskBytes(key, 0, c.writeBuf[maxFrameHeaderSize:w.pos])
if len(extra) > 0 {
- return c.writeFatal(errors.New("websocket: internal error, extra used in client mode"))
+ return w.endMessage(c.writeFatal(errors.New("websocket: internal error, extra used in client mode")))
}
}
@@ -594,11 +619,11 @@
c.isWriting = false
if err != nil {
- return w.fatal(err)
+ return w.endMessage(err)
}
if final {
- c.writer = nil
+ w.endMessage(errWriteClosed)
return nil
}
@@ -696,11 +721,7 @@
if w.err != nil {
return w.err
}
- if err := w.flushFrame(true, nil); err != nil {
- return err
- }
- w.err = errWriteClosed
- return nil
+ return w.flushFrame(true, nil)
}
// WritePreparedMessage writes prepared message into connection.
@@ -732,10 +753,10 @@
if c.isServer && (c.newCompressionWriter == nil || !c.enableWriteCompression) {
// Fast path with no allocations and single frame.
- if err := c.prepWrite(messageType); err != nil {
+ var mw messageWriter
+ if err := c.beginMessage(&mw, messageType); err != nil {
return err
}
- mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize}
n := copy(c.writeBuf[mw.pos:], data)
mw.pos += n
data = data[n:]
@@ -764,7 +785,6 @@
// Read methods
func (c *Conn) advanceFrame() (int, error) {
-
// 1. Skip remainder of previous frame.
if c.readRemaining > 0 {
@@ -783,7 +803,7 @@
final := p[0]&finalBit != 0
frameType := int(p[0] & 0xf)
mask := p[1]&maskBit != 0
- c.readRemaining = int64(p[1] & 0x7f)
+ c.setReadRemaining(int64(p[1] & 0x7f))
c.readDecompress = false
if c.newDecompressionReader != nil && (p[0]&rsv1Bit) != 0 {
@@ -817,7 +837,17 @@
return noFrame, c.handleProtocolError("unknown opcode " + strconv.Itoa(frameType))
}
- // 3. Read and parse frame length.
+ // 3. Read and parse frame length as per
+ // https://tools.ietf.org/html/rfc6455#section-5.2
+ //
+ // The length of the "Payload data", in bytes: if 0-125, that is the payload
+ // length.
+ // - If 126, the following 2 bytes interpreted as a 16-bit unsigned
+ // integer are the payload length.
+ // - If 127, the following 8 bytes interpreted as
+ // a 64-bit unsigned integer (the most significant bit MUST be 0) are the
+ // payload length. Multibyte length quantities are expressed in network byte
+ // order.
switch c.readRemaining {
case 126:
@@ -825,13 +855,19 @@
if err != nil {
return noFrame, err
}
- c.readRemaining = int64(binary.BigEndian.Uint16(p))
+
+ if err := c.setReadRemaining(int64(binary.BigEndian.Uint16(p))); err != nil {
+ return noFrame, err
+ }
case 127:
p, err := c.read(8)
if err != nil {
return noFrame, err
}
- c.readRemaining = int64(binary.BigEndian.Uint64(p))
+
+ if err := c.setReadRemaining(int64(binary.BigEndian.Uint64(p))); err != nil {
+ return noFrame, err
+ }
}
// 4. Handle frame masking.
@@ -854,6 +890,12 @@
if frameType == continuationFrame || frameType == TextMessage || frameType == BinaryMessage {
c.readLength += c.readRemaining
+ // Don't allow readLength to overflow in the presence of a large readRemaining
+ // counter.
+ if c.readLength < 0 {
+ return noFrame, ErrReadLimit
+ }
+
if c.readLimit > 0 && c.readLength > c.readLimit {
c.WriteControl(CloseMessage, FormatCloseMessage(CloseMessageTooBig, ""), time.Now().Add(writeWait))
return noFrame, ErrReadLimit
@@ -867,7 +909,7 @@
var payload []byte
if c.readRemaining > 0 {
payload, err = c.read(int(c.readRemaining))
- c.readRemaining = 0
+ c.setReadRemaining(0)
if err != nil {
return noFrame, err
}
@@ -940,6 +982,7 @@
c.readErr = hideTempErr(err)
break
}
+
if frameType == TextMessage || frameType == BinaryMessage {
c.messageReader = &messageReader{c}
c.reader = c.messageReader
@@ -980,7 +1023,9 @@
if c.isServer {
c.readMaskPos = maskBytes(c.readMaskKey, c.readMaskPos, b[:n])
}
- c.readRemaining -= int64(n)
+ rem := c.readRemaining
+ rem -= int64(n)
+ c.setReadRemaining(rem)
if c.readRemaining > 0 && c.readErr == io.EOF {
c.readErr = errUnexpectedEOF
}
@@ -1032,8 +1077,8 @@
return c.conn.SetReadDeadline(t)
}
-// SetReadLimit sets the maximum size for a message read from the peer. If a
-// message exceeds the limit, the connection sends a close frame to the peer
+// SetReadLimit sets the maximum size in bytes for a message read from the peer. If a
+// message exceeds the limit, the connection sends a close message to the peer
// and returns ErrReadLimit to the application.
func (c *Conn) SetReadLimit(limit int64) {
c.readLimit = limit
@@ -1046,24 +1091,22 @@
// SetCloseHandler sets the handler for close messages received from the peer.
// The code argument to h is the received close code or CloseNoStatusReceived
-// if the close message is empty. The default close handler sends a close frame
-// back to the peer.
+// if the close message is empty. The default close handler sends a close
+// message back to the peer.
//
-// The application must read the connection to process close messages as
-// described in the section on Control Frames above.
+// The handler function is called from the NextReader, ReadMessage and message
+// reader Read methods. The application must read the connection to process
+// close messages as described in the section on Control Messages above.
//
-// The connection read methods return a CloseError when a close frame is
+// The connection read methods return a CloseError when a close message is
// received. Most applications should handle close messages as part of their
// normal error handling. Applications should only set a close handler when the
-// application must perform some action before sending a close frame back to
+// application must perform some action before sending a close message back to
// the peer.
func (c *Conn) SetCloseHandler(h func(code int, text string) error) {
if h == nil {
h = func(code int, text string) error {
- message := []byte{}
- if code != CloseNoStatusReceived {
- message = FormatCloseMessage(code, "")
- }
+ message := FormatCloseMessage(code, "")
c.WriteControl(CloseMessage, message, time.Now().Add(writeWait))
return nil
}
@@ -1077,11 +1120,12 @@
}
// SetPingHandler sets the handler for ping messages received from the peer.
-// The appData argument to h is the PING frame application data. The default
+// The appData argument to h is the PING message application data. The default
// ping handler sends a pong to the peer.
//
-// The application must read the connection to process ping messages as
-// described in the section on Control Frames above.
+// The handler function is called from the NextReader, ReadMessage and message
+// reader Read methods. The application must read the connection to process
+// ping messages as described in the section on Control Messages above.
func (c *Conn) SetPingHandler(h func(appData string) error) {
if h == nil {
h = func(message string) error {
@@ -1103,11 +1147,12 @@
}
// SetPongHandler sets the handler for pong messages received from the peer.
-// The appData argument to h is the PONG frame application data. The default
+// The appData argument to h is the PONG message application data. The default
// pong handler does nothing.
//
-// The application must read the connection to process ping messages as
-// described in the section on Control Frames above.
+// The handler function is called from the NextReader, ReadMessage and message
+// reader Read methods. The application must read the connection to process
+// pong messages as described in the section on Control Messages above.
func (c *Conn) SetPongHandler(h func(appData string) error) {
if h == nil {
h = func(string) error { return nil }
@@ -1141,7 +1186,14 @@
}
// FormatCloseMessage formats closeCode and text as a WebSocket close message.
+// An empty message is returned for code CloseNoStatusReceived.
func FormatCloseMessage(closeCode int, text string) []byte {
+ if closeCode == CloseNoStatusReceived {
+ // Return empty message because it's illegal to send
+ // CloseNoStatusReceived. Return non-nil value in case application
+ // checks for nil.
+ return []byte{}
+ }
buf := make([]byte, 2+len(text))
binary.BigEndian.PutUint16(buf, uint16(closeCode))
copy(buf[2:], text)