| package sarama |
| |
| import ( |
| "encoding/binary" |
| "time" |
| ) |
| |
| const ( |
| isTransactionalMask = 0x10 |
| controlMask = 0x20 |
| maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1 |
| ) |
| |
| //RecordHeader stores key and value for a record header |
| type RecordHeader struct { |
| Key []byte |
| Value []byte |
| } |
| |
| func (h *RecordHeader) encode(pe packetEncoder) error { |
| if err := pe.putVarintBytes(h.Key); err != nil { |
| return err |
| } |
| return pe.putVarintBytes(h.Value) |
| } |
| |
| func (h *RecordHeader) decode(pd packetDecoder) (err error) { |
| if h.Key, err = pd.getVarintBytes(); err != nil { |
| return err |
| } |
| |
| if h.Value, err = pd.getVarintBytes(); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| //Record is kafka record type |
| type Record struct { |
| Headers []*RecordHeader |
| |
| Attributes int8 |
| TimestampDelta time.Duration |
| OffsetDelta int64 |
| Key []byte |
| Value []byte |
| length varintLengthField |
| } |
| |
| func (r *Record) encode(pe packetEncoder) error { |
| pe.push(&r.length) |
| pe.putInt8(r.Attributes) |
| pe.putVarint(int64(r.TimestampDelta / time.Millisecond)) |
| pe.putVarint(r.OffsetDelta) |
| if err := pe.putVarintBytes(r.Key); err != nil { |
| return err |
| } |
| if err := pe.putVarintBytes(r.Value); err != nil { |
| return err |
| } |
| pe.putVarint(int64(len(r.Headers))) |
| |
| for _, h := range r.Headers { |
| if err := h.encode(pe); err != nil { |
| return err |
| } |
| } |
| |
| return pe.pop() |
| } |
| |
| func (r *Record) decode(pd packetDecoder) (err error) { |
| if err = pd.push(&r.length); err != nil { |
| return err |
| } |
| |
| if r.Attributes, err = pd.getInt8(); err != nil { |
| return err |
| } |
| |
| timestamp, err := pd.getVarint() |
| if err != nil { |
| return err |
| } |
| r.TimestampDelta = time.Duration(timestamp) * time.Millisecond |
| |
| if r.OffsetDelta, err = pd.getVarint(); err != nil { |
| return err |
| } |
| |
| if r.Key, err = pd.getVarintBytes(); err != nil { |
| return err |
| } |
| |
| if r.Value, err = pd.getVarintBytes(); err != nil { |
| return err |
| } |
| |
| numHeaders, err := pd.getVarint() |
| if err != nil { |
| return err |
| } |
| |
| if numHeaders >= 0 { |
| r.Headers = make([]*RecordHeader, numHeaders) |
| } |
| for i := int64(0); i < numHeaders; i++ { |
| hdr := new(RecordHeader) |
| if err := hdr.decode(pd); err != nil { |
| return err |
| } |
| r.Headers[i] = hdr |
| } |
| |
| return pd.pop() |
| } |