blob: f8f38844cd9bfa68a669a0c3b4bde34ffe54f3d5 [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package wiremessage
8
9import (
10 "errors"
11 "fmt"
12)
13
14// Compressed represents the OP_COMPRESSED message of the MongoDB wire protocol.
15type Compressed struct {
16 MsgHeader Header
17 OriginalOpCode OpCode
18 UncompressedSize int32
19 CompressorID CompressorID
20 CompressedMessage []byte
21}
22
23// MarshalWireMessage implements the Marshaler and WireMessage interfaces.
24func (c Compressed) MarshalWireMessage() ([]byte, error) {
25 b := make([]byte, 0, c.Len())
26 return c.AppendWireMessage(b)
27}
28
29// ValidateWireMessage implements the Validator and WireMessage interfaces.
30func (c Compressed) ValidateWireMessage() error {
31 if int(c.MsgHeader.MessageLength) != c.Len() {
32 return errors.New("incorrect header: message length is not correct")
33 }
34
35 if c.MsgHeader.OpCode != OpCompressed {
36 return errors.New("incorrect header: opcode is not OpCompressed")
37 }
38
39 if c.OriginalOpCode != c.MsgHeader.OpCode {
40 return errors.New("incorrect header: original opcode does not match opcode in message header")
41 }
42 return nil
43}
44
45// AppendWireMessage implements the Appender and WireMessage interfaces.
46//
47// AppendWireMessage will set the MessageLength property of MsgHeader if it is 0. It will also set the OpCode to
48// OpCompressed if the OpCode is 0. If either of these properties are non-zero and not correct, this method will return
49// both the []byte with the wire message appended to it and an invalid header error.
50func (c Compressed) AppendWireMessage(b []byte) ([]byte, error) {
51 err := c.MsgHeader.SetDefaults(c.Len(), OpCompressed)
52
53 b = c.MsgHeader.AppendHeader(b)
54 b = appendInt32(b, int32(c.OriginalOpCode))
55 b = appendInt32(b, c.UncompressedSize)
56 b = append(b, byte(c.CompressorID))
57 b = append(b, c.CompressedMessage...)
58
59 return b, err
60}
61
62// String implements the fmt.Stringer interface.
63func (c Compressed) String() string {
64 return fmt.Sprintf(
65 `OP_COMPRESSED{MsgHeader: %s, Uncompressed Size: %d, CompressorId: %d, Compressed message: %s}`,
66 c.MsgHeader, c.UncompressedSize, c.CompressorID, c.CompressedMessage,
67 )
68}
69
70// Len implements the WireMessage interface.
71func (c Compressed) Len() int {
72 // Header + OpCode + UncompressedSize + CompressorId + CompressedMessage
73 return 16 + 4 + 4 + 1 + len(c.CompressedMessage)
74}
75
76// UnmarshalWireMessage implements the Unmarshaler interface.
77func (c *Compressed) UnmarshalWireMessage(b []byte) error {
78 var err error
79 c.MsgHeader, err = ReadHeader(b, 0)
80 if err != nil {
81 return err
82 }
83
84 if len(b) < int(c.MsgHeader.MessageLength) {
85 return Error{Type: ErrOpCompressed, Message: "[]byte too small"}
86 }
87
88 c.OriginalOpCode = OpCode(readInt32(b, 16)) // skip first 16 for header
89 c.UncompressedSize = readInt32(b, 20)
90 c.CompressorID = CompressorID(b[24])
91
92 // messageLength - Header - OpCode - UncompressedSize - CompressorId
93 msgLen := c.MsgHeader.MessageLength - 16 - 4 - 4 - 1
94 c.CompressedMessage = b[25 : 25+msgLen]
95
96 return nil
97}
98
99// CompressorID is the ID for each type of Compressor.
100type CompressorID uint8
101
102// These constants represent the individual compressor IDs for an OP_COMPRESSED.
103const (
104 CompressorNoOp CompressorID = iota
105 CompressorSnappy
106 CompressorZLib
107)
108
109// DefaultZlibLevel is the default level for zlib compression
110const DefaultZlibLevel = 6