blob: 12cd7c3d510489522dfe489a275ba770910ed71f [file] [log] [blame]
package sarama
import (
"bytes"
"compress/gzip"
"fmt"
"sync"
snappy "github.com/eapache/go-xerial-snappy"
"github.com/pierrec/lz4"
)
var (
lz4WriterPool = sync.Pool{
New: func() interface{} {
return lz4.NewWriter(nil)
},
}
gzipWriterPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
gzipWriterPoolForCompressionLevel1 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 1)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel2 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 2)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel3 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 3)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel4 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 4)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel5 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 5)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel6 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 6)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel7 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 7)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel8 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 8)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel9 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 9)
if err != nil {
panic(err)
}
return gz
},
}
)
func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
switch cc {
case CompressionNone:
return data, nil
case CompressionGZIP:
var (
err error
buf bytes.Buffer
writer *gzip.Writer
)
switch level {
case CompressionLevelDefault:
writer = gzipWriterPool.Get().(*gzip.Writer)
defer gzipWriterPool.Put(writer)
writer.Reset(&buf)
case 1:
writer = gzipWriterPoolForCompressionLevel1.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel1.Put(writer)
writer.Reset(&buf)
case 2:
writer = gzipWriterPoolForCompressionLevel2.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel2.Put(writer)
writer.Reset(&buf)
case 3:
writer = gzipWriterPoolForCompressionLevel3.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel3.Put(writer)
writer.Reset(&buf)
case 4:
writer = gzipWriterPoolForCompressionLevel4.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel4.Put(writer)
writer.Reset(&buf)
case 5:
writer = gzipWriterPoolForCompressionLevel5.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel5.Put(writer)
writer.Reset(&buf)
case 6:
writer = gzipWriterPoolForCompressionLevel6.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel6.Put(writer)
writer.Reset(&buf)
case 7:
writer = gzipWriterPoolForCompressionLevel7.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel7.Put(writer)
writer.Reset(&buf)
case 8:
writer = gzipWriterPoolForCompressionLevel8.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel8.Put(writer)
writer.Reset(&buf)
case 9:
writer = gzipWriterPoolForCompressionLevel9.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel9.Put(writer)
writer.Reset(&buf)
default:
writer, err = gzip.NewWriterLevel(&buf, level)
if err != nil {
return nil, err
}
}
if _, err := writer.Write(data); err != nil {
return nil, err
}
if err := writer.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
case CompressionSnappy:
return snappy.Encode(data), nil
case CompressionLZ4:
writer := lz4WriterPool.Get().(*lz4.Writer)
defer lz4WriterPool.Put(writer)
var buf bytes.Buffer
writer.Reset(&buf)
if _, err := writer.Write(data); err != nil {
return nil, err
}
if err := writer.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
case CompressionZSTD:
return zstdCompress(nil, data)
default:
return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}
}
}