| package sarama |
| |
| import ( |
| "bytes" |
| "compress/gzip" |
| "fmt" |
| "sync" |
| |
| "github.com/eapache/go-xerial-snappy" |
| "github.com/pierrec/lz4" |
| ) |
| |
| var ( |
| lz4WriterPool = sync.Pool{ |
| New: func() interface{} { |
| return lz4.NewWriter(nil) |
| }, |
| } |
| |
| gzipWriterPool = sync.Pool{ |
| New: func() interface{} { |
| return gzip.NewWriter(nil) |
| }, |
| } |
| ) |
| |
| func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) { |
| switch cc { |
| case CompressionNone: |
| return data, nil |
| case CompressionGZIP: |
| var ( |
| err error |
| buf bytes.Buffer |
| writer *gzip.Writer |
| ) |
| if level != CompressionLevelDefault { |
| writer, err = gzip.NewWriterLevel(&buf, level) |
| if err != nil { |
| return nil, err |
| } |
| } else { |
| writer = gzipWriterPool.Get().(*gzip.Writer) |
| defer gzipWriterPool.Put(writer) |
| writer.Reset(&buf) |
| } |
| if _, err := writer.Write(data); err != nil { |
| return nil, err |
| } |
| if err := writer.Close(); err != nil { |
| return nil, err |
| } |
| return buf.Bytes(), nil |
| case CompressionSnappy: |
| return snappy.Encode(data), nil |
| case CompressionLZ4: |
| writer := lz4WriterPool.Get().(*lz4.Writer) |
| defer lz4WriterPool.Put(writer) |
| |
| var buf bytes.Buffer |
| writer.Reset(&buf) |
| |
| if _, err := writer.Write(data); err != nil { |
| return nil, err |
| } |
| if err := writer.Close(); err != nil { |
| return nil, err |
| } |
| return buf.Bytes(), nil |
| case CompressionZSTD: |
| return zstdCompress(nil, data) |
| default: |
| return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)} |
| } |
| } |