| // Copyright 2013 The Gorilla WebSocket Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package websocket |
| |
| import ( |
| "bufio" |
| "encoding/binary" |
| "errors" |
| "io" |
| "io/ioutil" |
| "math/rand" |
| "net" |
| "strconv" |
| "sync" |
| "time" |
| "unicode/utf8" |
| ) |
| |
| const ( |
| // Frame header byte 0 bits from Section 5.2 of RFC 6455 |
| finalBit = 1 << 7 |
| rsv1Bit = 1 << 6 |
| rsv2Bit = 1 << 5 |
| rsv3Bit = 1 << 4 |
| |
| // Frame header byte 1 bits from Section 5.2 of RFC 6455 |
| maskBit = 1 << 7 |
| |
| maxFrameHeaderSize = 2 + 8 + 4 // Fixed header + length + mask |
| maxControlFramePayloadSize = 125 |
| |
| writeWait = time.Second |
| |
| defaultReadBufferSize = 4096 |
| defaultWriteBufferSize = 4096 |
| |
| continuationFrame = 0 |
| noFrame = -1 |
| ) |
| |
| // Close codes defined in RFC 6455, section 11.7. |
| const ( |
| CloseNormalClosure = 1000 |
| CloseGoingAway = 1001 |
| CloseProtocolError = 1002 |
| CloseUnsupportedData = 1003 |
| CloseNoStatusReceived = 1005 |
| CloseAbnormalClosure = 1006 |
| CloseInvalidFramePayloadData = 1007 |
| ClosePolicyViolation = 1008 |
| CloseMessageTooBig = 1009 |
| CloseMandatoryExtension = 1010 |
| CloseInternalServerErr = 1011 |
| CloseServiceRestart = 1012 |
| CloseTryAgainLater = 1013 |
| CloseTLSHandshake = 1015 |
| ) |
| |
| // The message types are defined in RFC 6455, section 11.8. |
| const ( |
| // TextMessage denotes a text data message. The text message payload is |
| // interpreted as UTF-8 encoded text data. |
| TextMessage = 1 |
| |
| // BinaryMessage denotes a binary data message. |
| BinaryMessage = 2 |
| |
| // CloseMessage denotes a close control message. The optional message |
| // payload contains a numeric code and text. Use the FormatCloseMessage |
| // function to format a close message payload. |
| CloseMessage = 8 |
| |
| // PingMessage denotes a ping control message. The optional message payload |
| // is UTF-8 encoded text. |
| PingMessage = 9 |
| |
| // PongMessage denotes a ping control message. The optional message payload |
| // is UTF-8 encoded text. |
| PongMessage = 10 |
| ) |
| |
| // ErrCloseSent is returned when the application writes a message to the |
| // connection after sending a close message. |
| var ErrCloseSent = errors.New("websocket: close sent") |
| |
| // ErrReadLimit is returned when reading a message that is larger than the |
| // read limit set for the connection. |
| var ErrReadLimit = errors.New("websocket: read limit exceeded") |
| |
| // netError satisfies the net Error interface. |
| type netError struct { |
| msg string |
| temporary bool |
| timeout bool |
| } |
| |
| func (e *netError) Error() string { return e.msg } |
| func (e *netError) Temporary() bool { return e.temporary } |
| func (e *netError) Timeout() bool { return e.timeout } |
| |
| // CloseError represents close frame. |
| type CloseError struct { |
| |
| // Code is defined in RFC 6455, section 11.7. |
| Code int |
| |
| // Text is the optional text payload. |
| Text string |
| } |
| |
| func (e *CloseError) Error() string { |
| s := []byte("websocket: close ") |
| s = strconv.AppendInt(s, int64(e.Code), 10) |
| switch e.Code { |
| case CloseNormalClosure: |
| s = append(s, " (normal)"...) |
| case CloseGoingAway: |
| s = append(s, " (going away)"...) |
| case CloseProtocolError: |
| s = append(s, " (protocol error)"...) |
| case CloseUnsupportedData: |
| s = append(s, " (unsupported data)"...) |
| case CloseNoStatusReceived: |
| s = append(s, " (no status)"...) |
| case CloseAbnormalClosure: |
| s = append(s, " (abnormal closure)"...) |
| case CloseInvalidFramePayloadData: |
| s = append(s, " (invalid payload data)"...) |
| case ClosePolicyViolation: |
| s = append(s, " (policy violation)"...) |
| case CloseMessageTooBig: |
| s = append(s, " (message too big)"...) |
| case CloseMandatoryExtension: |
| s = append(s, " (mandatory extension missing)"...) |
| case CloseInternalServerErr: |
| s = append(s, " (internal server error)"...) |
| case CloseTLSHandshake: |
| s = append(s, " (TLS handshake error)"...) |
| } |
| if e.Text != "" { |
| s = append(s, ": "...) |
| s = append(s, e.Text...) |
| } |
| return string(s) |
| } |
| |
| // IsCloseError returns boolean indicating whether the error is a *CloseError |
| // with one of the specified codes. |
| func IsCloseError(err error, codes ...int) bool { |
| if e, ok := err.(*CloseError); ok { |
| for _, code := range codes { |
| if e.Code == code { |
| return true |
| } |
| } |
| } |
| return false |
| } |
| |
| // IsUnexpectedCloseError returns boolean indicating whether the error is a |
| // *CloseError with a code not in the list of expected codes. |
| func IsUnexpectedCloseError(err error, expectedCodes ...int) bool { |
| if e, ok := err.(*CloseError); ok { |
| for _, code := range expectedCodes { |
| if e.Code == code { |
| return false |
| } |
| } |
| return true |
| } |
| return false |
| } |
| |
| var ( |
| errWriteTimeout = &netError{msg: "websocket: write timeout", timeout: true, temporary: true} |
| errUnexpectedEOF = &CloseError{Code: CloseAbnormalClosure, Text: io.ErrUnexpectedEOF.Error()} |
| errBadWriteOpCode = errors.New("websocket: bad write message type") |
| errWriteClosed = errors.New("websocket: write closed") |
| errInvalidControlFrame = errors.New("websocket: invalid control frame") |
| ) |
| |
| func newMaskKey() [4]byte { |
| n := rand.Uint32() |
| return [4]byte{byte(n), byte(n >> 8), byte(n >> 16), byte(n >> 24)} |
| } |
| |
| func hideTempErr(err error) error { |
| if e, ok := err.(net.Error); ok && e.Temporary() { |
| err = &netError{msg: e.Error(), timeout: e.Timeout()} |
| } |
| return err |
| } |
| |
| func isControl(frameType int) bool { |
| return frameType == CloseMessage || frameType == PingMessage || frameType == PongMessage |
| } |
| |
| func isData(frameType int) bool { |
| return frameType == TextMessage || frameType == BinaryMessage |
| } |
| |
| var validReceivedCloseCodes = map[int]bool{ |
| // see http://www.iana.org/assignments/websocket/websocket.xhtml#close-code-number |
| |
| CloseNormalClosure: true, |
| CloseGoingAway: true, |
| CloseProtocolError: true, |
| CloseUnsupportedData: true, |
| CloseNoStatusReceived: false, |
| CloseAbnormalClosure: false, |
| CloseInvalidFramePayloadData: true, |
| ClosePolicyViolation: true, |
| CloseMessageTooBig: true, |
| CloseMandatoryExtension: true, |
| CloseInternalServerErr: true, |
| CloseServiceRestart: true, |
| CloseTryAgainLater: true, |
| CloseTLSHandshake: false, |
| } |
| |
| func isValidReceivedCloseCode(code int) bool { |
| return validReceivedCloseCodes[code] || (code >= 3000 && code <= 4999) |
| } |
| |
| // The Conn type represents a WebSocket connection. |
| type Conn struct { |
| conn net.Conn |
| isServer bool |
| subprotocol string |
| |
| // Write fields |
| mu chan bool // used as mutex to protect write to conn |
| writeBuf []byte // frame is constructed in this buffer. |
| writeDeadline time.Time |
| writer io.WriteCloser // the current writer returned to the application |
| isWriting bool // for best-effort concurrent write detection |
| |
| writeErrMu sync.Mutex |
| writeErr error |
| |
| enableWriteCompression bool |
| compressionLevel int |
| newCompressionWriter func(io.WriteCloser, int) io.WriteCloser |
| |
| // Read fields |
| reader io.ReadCloser // the current reader returned to the application |
| readErr error |
| br *bufio.Reader |
| readRemaining int64 // bytes remaining in current frame. |
| readFinal bool // true the current message has more frames. |
| readLength int64 // Message size. |
| readLimit int64 // Maximum message size. |
| readMaskPos int |
| readMaskKey [4]byte |
| handlePong func(string) error |
| handlePing func(string) error |
| handleClose func(int, string) error |
| readErrCount int |
| messageReader *messageReader // the current low-level reader |
| |
| readDecompress bool // whether last read frame had RSV1 set |
| newDecompressionReader func(io.Reader) io.ReadCloser |
| } |
| |
| func newConn(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int) *Conn { |
| return newConnBRW(conn, isServer, readBufferSize, writeBufferSize, nil) |
| } |
| |
| type writeHook struct { |
| p []byte |
| } |
| |
| func (wh *writeHook) Write(p []byte) (int, error) { |
| wh.p = p |
| return len(p), nil |
| } |
| |
| func newConnBRW(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int, brw *bufio.ReadWriter) *Conn { |
| mu := make(chan bool, 1) |
| mu <- true |
| |
| var br *bufio.Reader |
| if readBufferSize == 0 && brw != nil && brw.Reader != nil { |
| // Reuse the supplied bufio.Reader if the buffer has a useful size. |
| // This code assumes that peek on a reader returns |
| // bufio.Reader.buf[:0]. |
| brw.Reader.Reset(conn) |
| if p, err := brw.Reader.Peek(0); err == nil && cap(p) >= 256 { |
| br = brw.Reader |
| } |
| } |
| if br == nil { |
| if readBufferSize == 0 { |
| readBufferSize = defaultReadBufferSize |
| } |
| if readBufferSize < maxControlFramePayloadSize { |
| readBufferSize = maxControlFramePayloadSize |
| } |
| br = bufio.NewReaderSize(conn, readBufferSize) |
| } |
| |
| var writeBuf []byte |
| if writeBufferSize == 0 && brw != nil && brw.Writer != nil { |
| // Use the bufio.Writer's buffer if the buffer has a useful size. This |
| // code assumes that bufio.Writer.buf[:1] is passed to the |
| // bufio.Writer's underlying writer. |
| var wh writeHook |
| brw.Writer.Reset(&wh) |
| brw.Writer.WriteByte(0) |
| brw.Flush() |
| if cap(wh.p) >= maxFrameHeaderSize+256 { |
| writeBuf = wh.p[:cap(wh.p)] |
| } |
| } |
| |
| if writeBuf == nil { |
| if writeBufferSize == 0 { |
| writeBufferSize = defaultWriteBufferSize |
| } |
| writeBuf = make([]byte, writeBufferSize+maxFrameHeaderSize) |
| } |
| |
| c := &Conn{ |
| isServer: isServer, |
| br: br, |
| conn: conn, |
| mu: mu, |
| readFinal: true, |
| writeBuf: writeBuf, |
| enableWriteCompression: true, |
| compressionLevel: defaultCompressionLevel, |
| } |
| c.SetCloseHandler(nil) |
| c.SetPingHandler(nil) |
| c.SetPongHandler(nil) |
| return c |
| } |
| |
| // Subprotocol returns the negotiated protocol for the connection. |
| func (c *Conn) Subprotocol() string { |
| return c.subprotocol |
| } |
| |
| // Close closes the underlying network connection without sending or waiting for a close frame. |
| func (c *Conn) Close() error { |
| return c.conn.Close() |
| } |
| |
| // LocalAddr returns the local network address. |
| func (c *Conn) LocalAddr() net.Addr { |
| return c.conn.LocalAddr() |
| } |
| |
| // RemoteAddr returns the remote network address. |
| func (c *Conn) RemoteAddr() net.Addr { |
| return c.conn.RemoteAddr() |
| } |
| |
| // Write methods |
| |
| func (c *Conn) writeFatal(err error) error { |
| err = hideTempErr(err) |
| c.writeErrMu.Lock() |
| if c.writeErr == nil { |
| c.writeErr = err |
| } |
| c.writeErrMu.Unlock() |
| return err |
| } |
| |
| func (c *Conn) write(frameType int, deadline time.Time, bufs ...[]byte) error { |
| <-c.mu |
| defer func() { c.mu <- true }() |
| |
| c.writeErrMu.Lock() |
| err := c.writeErr |
| c.writeErrMu.Unlock() |
| if err != nil { |
| return err |
| } |
| |
| c.conn.SetWriteDeadline(deadline) |
| for _, buf := range bufs { |
| if len(buf) > 0 { |
| _, err := c.conn.Write(buf) |
| if err != nil { |
| return c.writeFatal(err) |
| } |
| } |
| } |
| |
| if frameType == CloseMessage { |
| c.writeFatal(ErrCloseSent) |
| } |
| return nil |
| } |
| |
| // WriteControl writes a control message with the given deadline. The allowed |
| // message types are CloseMessage, PingMessage and PongMessage. |
| func (c *Conn) WriteControl(messageType int, data []byte, deadline time.Time) error { |
| if !isControl(messageType) { |
| return errBadWriteOpCode |
| } |
| if len(data) > maxControlFramePayloadSize { |
| return errInvalidControlFrame |
| } |
| |
| b0 := byte(messageType) | finalBit |
| b1 := byte(len(data)) |
| if !c.isServer { |
| b1 |= maskBit |
| } |
| |
| buf := make([]byte, 0, maxFrameHeaderSize+maxControlFramePayloadSize) |
| buf = append(buf, b0, b1) |
| |
| if c.isServer { |
| buf = append(buf, data...) |
| } else { |
| key := newMaskKey() |
| buf = append(buf, key[:]...) |
| buf = append(buf, data...) |
| maskBytes(key, 0, buf[6:]) |
| } |
| |
| d := time.Hour * 1000 |
| if !deadline.IsZero() { |
| d = deadline.Sub(time.Now()) |
| if d < 0 { |
| return errWriteTimeout |
| } |
| } |
| |
| timer := time.NewTimer(d) |
| select { |
| case <-c.mu: |
| timer.Stop() |
| case <-timer.C: |
| return errWriteTimeout |
| } |
| defer func() { c.mu <- true }() |
| |
| c.writeErrMu.Lock() |
| err := c.writeErr |
| c.writeErrMu.Unlock() |
| if err != nil { |
| return err |
| } |
| |
| c.conn.SetWriteDeadline(deadline) |
| _, err = c.conn.Write(buf) |
| if err != nil { |
| return c.writeFatal(err) |
| } |
| if messageType == CloseMessage { |
| c.writeFatal(ErrCloseSent) |
| } |
| return err |
| } |
| |
| func (c *Conn) prepWrite(messageType int) error { |
| // Close previous writer if not already closed by the application. It's |
| // probably better to return an error in this situation, but we cannot |
| // change this without breaking existing applications. |
| if c.writer != nil { |
| c.writer.Close() |
| c.writer = nil |
| } |
| |
| if !isControl(messageType) && !isData(messageType) { |
| return errBadWriteOpCode |
| } |
| |
| c.writeErrMu.Lock() |
| err := c.writeErr |
| c.writeErrMu.Unlock() |
| return err |
| } |
| |
| // NextWriter returns a writer for the next message to send. The writer's Close |
| // method flushes the complete message to the network. |
| // |
| // There can be at most one open writer on a connection. NextWriter closes the |
| // previous writer if the application has not already done so. |
| func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) { |
| if err := c.prepWrite(messageType); err != nil { |
| return nil, err |
| } |
| |
| mw := &messageWriter{ |
| c: c, |
| frameType: messageType, |
| pos: maxFrameHeaderSize, |
| } |
| c.writer = mw |
| if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) { |
| w := c.newCompressionWriter(c.writer, c.compressionLevel) |
| mw.compress = true |
| c.writer = w |
| } |
| return c.writer, nil |
| } |
| |
| type messageWriter struct { |
| c *Conn |
| compress bool // whether next call to flushFrame should set RSV1 |
| pos int // end of data in writeBuf. |
| frameType int // type of the current frame. |
| err error |
| } |
| |
| func (w *messageWriter) fatal(err error) error { |
| if w.err != nil { |
| w.err = err |
| w.c.writer = nil |
| } |
| return err |
| } |
| |
| // flushFrame writes buffered data and extra as a frame to the network. The |
| // final argument indicates that this is the last frame in the message. |
| func (w *messageWriter) flushFrame(final bool, extra []byte) error { |
| c := w.c |
| length := w.pos - maxFrameHeaderSize + len(extra) |
| |
| // Check for invalid control frames. |
| if isControl(w.frameType) && |
| (!final || length > maxControlFramePayloadSize) { |
| return w.fatal(errInvalidControlFrame) |
| } |
| |
| b0 := byte(w.frameType) |
| if final { |
| b0 |= finalBit |
| } |
| if w.compress { |
| b0 |= rsv1Bit |
| } |
| w.compress = false |
| |
| b1 := byte(0) |
| if !c.isServer { |
| b1 |= maskBit |
| } |
| |
| // Assume that the frame starts at beginning of c.writeBuf. |
| framePos := 0 |
| if c.isServer { |
| // Adjust up if mask not included in the header. |
| framePos = 4 |
| } |
| |
| switch { |
| case length >= 65536: |
| c.writeBuf[framePos] = b0 |
| c.writeBuf[framePos+1] = b1 | 127 |
| binary.BigEndian.PutUint64(c.writeBuf[framePos+2:], uint64(length)) |
| case length > 125: |
| framePos += 6 |
| c.writeBuf[framePos] = b0 |
| c.writeBuf[framePos+1] = b1 | 126 |
| binary.BigEndian.PutUint16(c.writeBuf[framePos+2:], uint16(length)) |
| default: |
| framePos += 8 |
| c.writeBuf[framePos] = b0 |
| c.writeBuf[framePos+1] = b1 | byte(length) |
| } |
| |
| if !c.isServer { |
| key := newMaskKey() |
| copy(c.writeBuf[maxFrameHeaderSize-4:], key[:]) |
| maskBytes(key, 0, c.writeBuf[maxFrameHeaderSize:w.pos]) |
| if len(extra) > 0 { |
| return c.writeFatal(errors.New("websocket: internal error, extra used in client mode")) |
| } |
| } |
| |
| // Write the buffers to the connection with best-effort detection of |
| // concurrent writes. See the concurrency section in the package |
| // documentation for more info. |
| |
| if c.isWriting { |
| panic("concurrent write to websocket connection") |
| } |
| c.isWriting = true |
| |
| err := c.write(w.frameType, c.writeDeadline, c.writeBuf[framePos:w.pos], extra) |
| |
| if !c.isWriting { |
| panic("concurrent write to websocket connection") |
| } |
| c.isWriting = false |
| |
| if err != nil { |
| return w.fatal(err) |
| } |
| |
| if final { |
| c.writer = nil |
| return nil |
| } |
| |
| // Setup for next frame. |
| w.pos = maxFrameHeaderSize |
| w.frameType = continuationFrame |
| return nil |
| } |
| |
| func (w *messageWriter) ncopy(max int) (int, error) { |
| n := len(w.c.writeBuf) - w.pos |
| if n <= 0 { |
| if err := w.flushFrame(false, nil); err != nil { |
| return 0, err |
| } |
| n = len(w.c.writeBuf) - w.pos |
| } |
| if n > max { |
| n = max |
| } |
| return n, nil |
| } |
| |
| func (w *messageWriter) Write(p []byte) (int, error) { |
| if w.err != nil { |
| return 0, w.err |
| } |
| |
| if len(p) > 2*len(w.c.writeBuf) && w.c.isServer { |
| // Don't buffer large messages. |
| err := w.flushFrame(false, p) |
| if err != nil { |
| return 0, err |
| } |
| return len(p), nil |
| } |
| |
| nn := len(p) |
| for len(p) > 0 { |
| n, err := w.ncopy(len(p)) |
| if err != nil { |
| return 0, err |
| } |
| copy(w.c.writeBuf[w.pos:], p[:n]) |
| w.pos += n |
| p = p[n:] |
| } |
| return nn, nil |
| } |
| |
| func (w *messageWriter) WriteString(p string) (int, error) { |
| if w.err != nil { |
| return 0, w.err |
| } |
| |
| nn := len(p) |
| for len(p) > 0 { |
| n, err := w.ncopy(len(p)) |
| if err != nil { |
| return 0, err |
| } |
| copy(w.c.writeBuf[w.pos:], p[:n]) |
| w.pos += n |
| p = p[n:] |
| } |
| return nn, nil |
| } |
| |
| func (w *messageWriter) ReadFrom(r io.Reader) (nn int64, err error) { |
| if w.err != nil { |
| return 0, w.err |
| } |
| for { |
| if w.pos == len(w.c.writeBuf) { |
| err = w.flushFrame(false, nil) |
| if err != nil { |
| break |
| } |
| } |
| var n int |
| n, err = r.Read(w.c.writeBuf[w.pos:]) |
| w.pos += n |
| nn += int64(n) |
| if err != nil { |
| if err == io.EOF { |
| err = nil |
| } |
| break |
| } |
| } |
| return nn, err |
| } |
| |
| func (w *messageWriter) Close() error { |
| if w.err != nil { |
| return w.err |
| } |
| if err := w.flushFrame(true, nil); err != nil { |
| return err |
| } |
| w.err = errWriteClosed |
| return nil |
| } |
| |
| // WritePreparedMessage writes prepared message into connection. |
| func (c *Conn) WritePreparedMessage(pm *PreparedMessage) error { |
| frameType, frameData, err := pm.frame(prepareKey{ |
| isServer: c.isServer, |
| compress: c.newCompressionWriter != nil && c.enableWriteCompression && isData(pm.messageType), |
| compressionLevel: c.compressionLevel, |
| }) |
| if err != nil { |
| return err |
| } |
| if c.isWriting { |
| panic("concurrent write to websocket connection") |
| } |
| c.isWriting = true |
| err = c.write(frameType, c.writeDeadline, frameData, nil) |
| if !c.isWriting { |
| panic("concurrent write to websocket connection") |
| } |
| c.isWriting = false |
| return err |
| } |
| |
| // WriteMessage is a helper method for getting a writer using NextWriter, |
| // writing the message and closing the writer. |
| func (c *Conn) WriteMessage(messageType int, data []byte) error { |
| |
| if c.isServer && (c.newCompressionWriter == nil || !c.enableWriteCompression) { |
| // Fast path with no allocations and single frame. |
| |
| if err := c.prepWrite(messageType); err != nil { |
| return err |
| } |
| mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize} |
| n := copy(c.writeBuf[mw.pos:], data) |
| mw.pos += n |
| data = data[n:] |
| return mw.flushFrame(true, data) |
| } |
| |
| w, err := c.NextWriter(messageType) |
| if err != nil { |
| return err |
| } |
| if _, err = w.Write(data); err != nil { |
| return err |
| } |
| return w.Close() |
| } |
| |
| // SetWriteDeadline sets the write deadline on the underlying network |
| // connection. After a write has timed out, the websocket state is corrupt and |
| // all future writes will return an error. A zero value for t means writes will |
| // not time out. |
| func (c *Conn) SetWriteDeadline(t time.Time) error { |
| c.writeDeadline = t |
| return nil |
| } |
| |
| // Read methods |
| |
| func (c *Conn) advanceFrame() (int, error) { |
| |
| // 1. Skip remainder of previous frame. |
| |
| if c.readRemaining > 0 { |
| if _, err := io.CopyN(ioutil.Discard, c.br, c.readRemaining); err != nil { |
| return noFrame, err |
| } |
| } |
| |
| // 2. Read and parse first two bytes of frame header. |
| |
| p, err := c.read(2) |
| if err != nil { |
| return noFrame, err |
| } |
| |
| final := p[0]&finalBit != 0 |
| frameType := int(p[0] & 0xf) |
| mask := p[1]&maskBit != 0 |
| c.readRemaining = int64(p[1] & 0x7f) |
| |
| c.readDecompress = false |
| if c.newDecompressionReader != nil && (p[0]&rsv1Bit) != 0 { |
| c.readDecompress = true |
| p[0] &^= rsv1Bit |
| } |
| |
| if rsv := p[0] & (rsv1Bit | rsv2Bit | rsv3Bit); rsv != 0 { |
| return noFrame, c.handleProtocolError("unexpected reserved bits 0x" + strconv.FormatInt(int64(rsv), 16)) |
| } |
| |
| switch frameType { |
| case CloseMessage, PingMessage, PongMessage: |
| if c.readRemaining > maxControlFramePayloadSize { |
| return noFrame, c.handleProtocolError("control frame length > 125") |
| } |
| if !final { |
| return noFrame, c.handleProtocolError("control frame not final") |
| } |
| case TextMessage, BinaryMessage: |
| if !c.readFinal { |
| return noFrame, c.handleProtocolError("message start before final message frame") |
| } |
| c.readFinal = final |
| case continuationFrame: |
| if c.readFinal { |
| return noFrame, c.handleProtocolError("continuation after final message frame") |
| } |
| c.readFinal = final |
| default: |
| return noFrame, c.handleProtocolError("unknown opcode " + strconv.Itoa(frameType)) |
| } |
| |
| // 3. Read and parse frame length. |
| |
| switch c.readRemaining { |
| case 126: |
| p, err := c.read(2) |
| if err != nil { |
| return noFrame, err |
| } |
| c.readRemaining = int64(binary.BigEndian.Uint16(p)) |
| case 127: |
| p, err := c.read(8) |
| if err != nil { |
| return noFrame, err |
| } |
| c.readRemaining = int64(binary.BigEndian.Uint64(p)) |
| } |
| |
| // 4. Handle frame masking. |
| |
| if mask != c.isServer { |
| return noFrame, c.handleProtocolError("incorrect mask flag") |
| } |
| |
| if mask { |
| c.readMaskPos = 0 |
| p, err := c.read(len(c.readMaskKey)) |
| if err != nil { |
| return noFrame, err |
| } |
| copy(c.readMaskKey[:], p) |
| } |
| |
| // 5. For text and binary messages, enforce read limit and return. |
| |
| if frameType == continuationFrame || frameType == TextMessage || frameType == BinaryMessage { |
| |
| c.readLength += c.readRemaining |
| if c.readLimit > 0 && c.readLength > c.readLimit { |
| c.WriteControl(CloseMessage, FormatCloseMessage(CloseMessageTooBig, ""), time.Now().Add(writeWait)) |
| return noFrame, ErrReadLimit |
| } |
| |
| return frameType, nil |
| } |
| |
| // 6. Read control frame payload. |
| |
| var payload []byte |
| if c.readRemaining > 0 { |
| payload, err = c.read(int(c.readRemaining)) |
| c.readRemaining = 0 |
| if err != nil { |
| return noFrame, err |
| } |
| if c.isServer { |
| maskBytes(c.readMaskKey, 0, payload) |
| } |
| } |
| |
| // 7. Process control frame payload. |
| |
| switch frameType { |
| case PongMessage: |
| if err := c.handlePong(string(payload)); err != nil { |
| return noFrame, err |
| } |
| case PingMessage: |
| if err := c.handlePing(string(payload)); err != nil { |
| return noFrame, err |
| } |
| case CloseMessage: |
| closeCode := CloseNoStatusReceived |
| closeText := "" |
| if len(payload) >= 2 { |
| closeCode = int(binary.BigEndian.Uint16(payload)) |
| if !isValidReceivedCloseCode(closeCode) { |
| return noFrame, c.handleProtocolError("invalid close code") |
| } |
| closeText = string(payload[2:]) |
| if !utf8.ValidString(closeText) { |
| return noFrame, c.handleProtocolError("invalid utf8 payload in close frame") |
| } |
| } |
| if err := c.handleClose(closeCode, closeText); err != nil { |
| return noFrame, err |
| } |
| return noFrame, &CloseError{Code: closeCode, Text: closeText} |
| } |
| |
| return frameType, nil |
| } |
| |
| func (c *Conn) handleProtocolError(message string) error { |
| c.WriteControl(CloseMessage, FormatCloseMessage(CloseProtocolError, message), time.Now().Add(writeWait)) |
| return errors.New("websocket: " + message) |
| } |
| |
| // NextReader returns the next data message received from the peer. The |
| // returned messageType is either TextMessage or BinaryMessage. |
| // |
| // There can be at most one open reader on a connection. NextReader discards |
| // the previous message if the application has not already consumed it. |
| // |
| // Applications must break out of the application's read loop when this method |
| // returns a non-nil error value. Errors returned from this method are |
| // permanent. Once this method returns a non-nil error, all subsequent calls to |
| // this method return the same error. |
| func (c *Conn) NextReader() (messageType int, r io.Reader, err error) { |
| // Close previous reader, only relevant for decompression. |
| if c.reader != nil { |
| c.reader.Close() |
| c.reader = nil |
| } |
| |
| c.messageReader = nil |
| c.readLength = 0 |
| |
| for c.readErr == nil { |
| frameType, err := c.advanceFrame() |
| if err != nil { |
| c.readErr = hideTempErr(err) |
| break |
| } |
| if frameType == TextMessage || frameType == BinaryMessage { |
| c.messageReader = &messageReader{c} |
| c.reader = c.messageReader |
| if c.readDecompress { |
| c.reader = c.newDecompressionReader(c.reader) |
| } |
| return frameType, c.reader, nil |
| } |
| } |
| |
| // Applications that do handle the error returned from this method spin in |
| // tight loop on connection failure. To help application developers detect |
| // this error, panic on repeated reads to the failed connection. |
| c.readErrCount++ |
| if c.readErrCount >= 1000 { |
| panic("repeated read on failed websocket connection") |
| } |
| |
| return noFrame, nil, c.readErr |
| } |
| |
| type messageReader struct{ c *Conn } |
| |
| func (r *messageReader) Read(b []byte) (int, error) { |
| c := r.c |
| if c.messageReader != r { |
| return 0, io.EOF |
| } |
| |
| for c.readErr == nil { |
| |
| if c.readRemaining > 0 { |
| if int64(len(b)) > c.readRemaining { |
| b = b[:c.readRemaining] |
| } |
| n, err := c.br.Read(b) |
| c.readErr = hideTempErr(err) |
| if c.isServer { |
| c.readMaskPos = maskBytes(c.readMaskKey, c.readMaskPos, b[:n]) |
| } |
| c.readRemaining -= int64(n) |
| if c.readRemaining > 0 && c.readErr == io.EOF { |
| c.readErr = errUnexpectedEOF |
| } |
| return n, c.readErr |
| } |
| |
| if c.readFinal { |
| c.messageReader = nil |
| return 0, io.EOF |
| } |
| |
| frameType, err := c.advanceFrame() |
| switch { |
| case err != nil: |
| c.readErr = hideTempErr(err) |
| case frameType == TextMessage || frameType == BinaryMessage: |
| c.readErr = errors.New("websocket: internal error, unexpected text or binary in Reader") |
| } |
| } |
| |
| err := c.readErr |
| if err == io.EOF && c.messageReader == r { |
| err = errUnexpectedEOF |
| } |
| return 0, err |
| } |
| |
| func (r *messageReader) Close() error { |
| return nil |
| } |
| |
| // ReadMessage is a helper method for getting a reader using NextReader and |
| // reading from that reader to a buffer. |
| func (c *Conn) ReadMessage() (messageType int, p []byte, err error) { |
| var r io.Reader |
| messageType, r, err = c.NextReader() |
| if err != nil { |
| return messageType, nil, err |
| } |
| p, err = ioutil.ReadAll(r) |
| return messageType, p, err |
| } |
| |
| // SetReadDeadline sets the read deadline on the underlying network connection. |
| // After a read has timed out, the websocket connection state is corrupt and |
| // all future reads will return an error. A zero value for t means reads will |
| // not time out. |
| func (c *Conn) SetReadDeadline(t time.Time) error { |
| return c.conn.SetReadDeadline(t) |
| } |
| |
| // SetReadLimit sets the maximum size for a message read from the peer. If a |
| // message exceeds the limit, the connection sends a close frame to the peer |
| // and returns ErrReadLimit to the application. |
| func (c *Conn) SetReadLimit(limit int64) { |
| c.readLimit = limit |
| } |
| |
| // CloseHandler returns the current close handler |
| func (c *Conn) CloseHandler() func(code int, text string) error { |
| return c.handleClose |
| } |
| |
| // SetCloseHandler sets the handler for close messages received from the peer. |
| // The code argument to h is the received close code or CloseNoStatusReceived |
| // if the close message is empty. The default close handler sends a close frame |
| // back to the peer. |
| // |
| // The application must read the connection to process close messages as |
| // described in the section on Control Frames above. |
| // |
| // The connection read methods return a CloseError when a close frame is |
| // received. Most applications should handle close messages as part of their |
| // normal error handling. Applications should only set a close handler when the |
| // application must perform some action before sending a close frame back to |
| // the peer. |
| func (c *Conn) SetCloseHandler(h func(code int, text string) error) { |
| if h == nil { |
| h = func(code int, text string) error { |
| message := []byte{} |
| if code != CloseNoStatusReceived { |
| message = FormatCloseMessage(code, "") |
| } |
| c.WriteControl(CloseMessage, message, time.Now().Add(writeWait)) |
| return nil |
| } |
| } |
| c.handleClose = h |
| } |
| |
| // PingHandler returns the current ping handler |
| func (c *Conn) PingHandler() func(appData string) error { |
| return c.handlePing |
| } |
| |
| // SetPingHandler sets the handler for ping messages received from the peer. |
| // The appData argument to h is the PING frame application data. The default |
| // ping handler sends a pong to the peer. |
| // |
| // The application must read the connection to process ping messages as |
| // described in the section on Control Frames above. |
| func (c *Conn) SetPingHandler(h func(appData string) error) { |
| if h == nil { |
| h = func(message string) error { |
| err := c.WriteControl(PongMessage, []byte(message), time.Now().Add(writeWait)) |
| if err == ErrCloseSent { |
| return nil |
| } else if e, ok := err.(net.Error); ok && e.Temporary() { |
| return nil |
| } |
| return err |
| } |
| } |
| c.handlePing = h |
| } |
| |
| // PongHandler returns the current pong handler |
| func (c *Conn) PongHandler() func(appData string) error { |
| return c.handlePong |
| } |
| |
| // SetPongHandler sets the handler for pong messages received from the peer. |
| // The appData argument to h is the PONG frame application data. The default |
| // pong handler does nothing. |
| // |
| // The application must read the connection to process ping messages as |
| // described in the section on Control Frames above. |
| func (c *Conn) SetPongHandler(h func(appData string) error) { |
| if h == nil { |
| h = func(string) error { return nil } |
| } |
| c.handlePong = h |
| } |
| |
| // UnderlyingConn returns the internal net.Conn. This can be used to further |
| // modifications to connection specific flags. |
| func (c *Conn) UnderlyingConn() net.Conn { |
| return c.conn |
| } |
| |
| // EnableWriteCompression enables and disables write compression of |
| // subsequent text and binary messages. This function is a noop if |
| // compression was not negotiated with the peer. |
| func (c *Conn) EnableWriteCompression(enable bool) { |
| c.enableWriteCompression = enable |
| } |
| |
| // SetCompressionLevel sets the flate compression level for subsequent text and |
| // binary messages. This function is a noop if compression was not negotiated |
| // with the peer. See the compress/flate package for a description of |
| // compression levels. |
| func (c *Conn) SetCompressionLevel(level int) error { |
| if !isValidCompressionLevel(level) { |
| return errors.New("websocket: invalid compression level") |
| } |
| c.compressionLevel = level |
| return nil |
| } |
| |
| // FormatCloseMessage formats closeCode and text as a WebSocket close message. |
| func FormatCloseMessage(closeCode int, text string) []byte { |
| buf := make([]byte, 2+len(text)) |
| binary.BigEndian.PutUint16(buf, uint16(closeCode)) |
| copy(buf[2:], text) |
| return buf |
| } |