[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor
Change-Id: Ibaf8681ccdbffcc8a3c68612c49d7822a20e1b14
diff --git a/vendor/github.com/pierrec/lz4/.gitignore b/vendor/github.com/pierrec/lz4/.gitignore
index e48bab3..5e98735 100644
--- a/vendor/github.com/pierrec/lz4/.gitignore
+++ b/vendor/github.com/pierrec/lz4/.gitignore
@@ -30,4 +30,5 @@
# End of https://www.gitignore.io/api/macos
-lz4c/lz4c
+cmd/*/*exe
+.idea
\ No newline at end of file
diff --git a/vendor/github.com/pierrec/lz4/.travis.yml b/vendor/github.com/pierrec/lz4/.travis.yml
index 658910d..fd6c6db 100644
--- a/vendor/github.com/pierrec/lz4/.travis.yml
+++ b/vendor/github.com/pierrec/lz4/.travis.yml
@@ -2,12 +2,12 @@
env:
- GO111MODULE=off
- - GO111MODULE=on
go:
- 1.9.x
- 1.10.x
- 1.11.x
+ - 1.12.x
- master
matrix:
diff --git a/vendor/github.com/pierrec/lz4/README.md b/vendor/github.com/pierrec/lz4/README.md
index 50a10ee..4ee388e 100644
--- a/vendor/github.com/pierrec/lz4/README.md
+++ b/vendor/github.com/pierrec/lz4/README.md
@@ -1,24 +1,90 @@
-[](https://godoc.org/github.com/pierrec/lz4)
+# lz4 : LZ4 compression in pure Go
-# lz4
-LZ4 compression and decompression in pure Go.
+[](https://godoc.org/github.com/pierrec/lz4)
+[](https://travis-ci.org/pierrec/lz4)
+[](https://goreportcard.com/report/github.com/pierrec/lz4)
+[](https://github.com/pierrec/lz4/tags)
-## Usage
+## Overview
-```go
-import "github.com/pierrec/lz4"
+This package provides a streaming interface to [LZ4 data streams](http://fastcompression.blogspot.fr/2013/04/lz4-streaming-format-final.html) as well as low level compress and uncompress functions for LZ4 data blocks.
+The implementation is based on the reference C [one](https://github.com/lz4/lz4).
+
+## Install
+
+Assuming you have the go toolchain installed:
+
+```
+go get github.com/pierrec/lz4
```
-## Description
-Package lz4 implements reading and writing lz4 compressed data (a frame),
-as specified in http://fastcompression.blogspot.fr/2013/04/lz4-streaming-format-final.html.
+There is a command line interface tool to compress and decompress LZ4 files.
-This package is **compatible with the LZ4 frame format** although the block level compression
-and decompression functions are exposed and are fully compatible with the lz4 block format
-definition, they are low level and should not be used directly.
+```
+go install github.com/pierrec/lz4/cmd/lz4c
+```
-For a complete description of an lz4 compressed block, see:
-http://fastcompression.blogspot.fr/2011/05/lz4-explained.html
+Usage
-See https://github.com/Cyan4973/lz4 for the reference C implementation.
+```
+Usage of lz4c:
+ -version
+ print the program version
+Subcommands:
+Compress the given files or from stdin to stdout.
+compress [arguments] [<file name> ...]
+ -bc
+ enable block checksum
+ -l int
+ compression level (0=fastest)
+ -sc
+ disable stream checksum
+ -size string
+ block max size [64K,256K,1M,4M] (default "4M")
+
+Uncompress the given files or from stdin to stdout.
+uncompress [arguments] [<file name> ...]
+
+```
+
+
+## Example
+
+```
+// Compress and uncompress an input string.
+s := "hello world"
+r := strings.NewReader(s)
+
+// The pipe will uncompress the data from the writer.
+pr, pw := io.Pipe()
+zw := lz4.NewWriter(pw)
+zr := lz4.NewReader(pr)
+
+go func() {
+ // Compress the input string.
+ _, _ = io.Copy(zw, r)
+ _ = zw.Close() // Make sure the writer is closed
+ _ = pw.Close() // Terminate the pipe
+}()
+
+_, _ = io.Copy(os.Stdout, zr)
+
+// Output:
+// hello world
+```
+
+## Contributing
+
+Contributions are very welcome for bug fixing, performance improvements...!
+
+- Open an issue with a proper description
+- Send a pull request with appropriate test case(s)
+
+## Contributors
+
+Thanks to all [contributors](https://github.com/pierrec/lz4/graphs/contributors) so far!
+
+Special thanks to [@Zariel](https://github.com/Zariel) for his asm implementation of the decoder.
+
+Special thanks to [@klauspost](https://github.com/klauspost) for his work on optimizing the code.
diff --git a/vendor/github.com/pierrec/lz4/block.go b/vendor/github.com/pierrec/lz4/block.go
index d96e0e7..664d9be 100644
--- a/vendor/github.com/pierrec/lz4/block.go
+++ b/vendor/github.com/pierrec/lz4/block.go
@@ -2,21 +2,14 @@
import (
"encoding/binary"
- "errors"
+ "math/bits"
+ "sync"
)
-var (
- // ErrInvalidSourceShortBuffer is returned by UncompressBlock or CompressBLock when a compressed
- // block is corrupted or the destination buffer is not large enough for the uncompressed data.
- ErrInvalidSourceShortBuffer = errors.New("lz4: invalid source or destination buffer too short")
- // ErrInvalid is returned when reading an invalid LZ4 archive.
- ErrInvalid = errors.New("lz4: bad magic number")
-)
-
-// blockHash hashes 4 bytes into a value < winSize.
-func blockHash(x uint32) uint32 {
- const hasher uint32 = 2654435761 // Knuth multiplicative hash.
- return x * hasher >> hashShift
+// blockHash hashes the lower 6 bytes into a value < htSize.
+func blockHash(x uint64) uint32 {
+ const prime6bytes = 227718039650203
+ return uint32(((x << (64 - 48)) * prime6bytes) >> (64 - hashLog))
}
// CompressBlockBound returns the maximum size of a given buffer of size n, when not compressible.
@@ -30,79 +23,127 @@
// The destination buffer must be sized appropriately.
//
// An error is returned if the source data is invalid or the destination buffer is too small.
-func UncompressBlock(src, dst []byte) (di int, err error) {
- sn := len(src)
- if sn == 0 {
+func UncompressBlock(src, dst []byte) (int, error) {
+ if len(src) == 0 {
return 0, nil
}
-
- di = decodeBlock(dst, src)
- if di < 0 {
- return 0, ErrInvalidSourceShortBuffer
+ if di := decodeBlock(dst, src); di >= 0 {
+ return di, nil
}
- return di, nil
+ return 0, ErrInvalidSourceShortBuffer
}
// CompressBlock compresses the source buffer into the destination one.
// This is the fast version of LZ4 compression and also the default one.
-// The size of hashTable must be at least 64Kb.
//
-// The size of the compressed data is returned. If it is 0 and no error, then the data is incompressible.
+// The argument hashTable is scratch space for a hash table used by the
+// compressor. If provided, it should have length at least 1<<16. If it is
+// shorter (or nil), CompressBlock allocates its own hash table.
+//
+// The size of the compressed data is returned.
+//
+// If the destination buffer size is lower than CompressBlockBound and
+// the compressed size is 0 and no error, then the data is incompressible.
//
// An error is returned if the destination buffer is too small.
-func CompressBlock(src, dst []byte, hashTable []int) (di int, err error) {
- defer func() {
- if recover() != nil {
- err = ErrInvalidSourceShortBuffer
- }
- }()
+func CompressBlock(src, dst []byte, hashTable []int) (_ int, err error) {
+ defer recoverBlock(&err)
- sn, dn := len(src)-mfLimit, len(dst)
- if sn <= 0 || dn == 0 {
- return 0, nil
+ // Return 0, nil only if the destination buffer size is < CompressBlockBound.
+ isNotCompressible := len(dst) < CompressBlockBound(len(src))
+
+ // adaptSkipLog sets how quickly the compressor begins skipping blocks when data is incompressible.
+ // This significantly speeds up incompressible data and usually has very small impact on compression.
+ // bytes to skip = 1 + (bytes since last match >> adaptSkipLog)
+ const adaptSkipLog = 7
+ if len(hashTable) < htSize {
+ htIface := htPool.Get()
+ defer htPool.Put(htIface)
+ hashTable = (*(htIface).(*[htSize]int))[:]
}
- var si int
+ // Prove to the compiler the table has at least htSize elements.
+ // The compiler can see that "uint32() >> hashShift" cannot be out of bounds.
+ hashTable = hashTable[:htSize]
+
+ // si: Current position of the search.
+ // anchor: Position of the current literals.
+ var si, di, anchor int
+ sn := len(src) - mfLimit
+ if sn <= 0 {
+ goto lastLiterals
+ }
// Fast scan strategy: the hash table only stores the last 4 bytes sequences.
- // const accInit = 1 << skipStrength
-
- anchor := si // Position of the current literals.
- // acc := accInit // Variable step: improves performance on non-compressible data.
-
for si < sn {
- // Hash the next 4 bytes (sequence)...
- match := binary.LittleEndian.Uint32(src[si:])
+ // Hash the next 6 bytes (sequence)...
+ match := binary.LittleEndian.Uint64(src[si:])
h := blockHash(match)
+ h2 := blockHash(match >> 8)
+ // We check a match at s, s+1 and s+2 and pick the first one we get.
+ // Checking 3 only requires us to load the source one.
ref := hashTable[h]
+ ref2 := hashTable[h2]
hashTable[h] = si
- if ref >= sn { // Invalid reference (dirty hashtable).
- si++
- continue
- }
+ hashTable[h2] = si + 1
offset := si - ref
+
+ // If offset <= 0 we got an old entry in the hash table.
if offset <= 0 || offset >= winSize || // Out of window.
- match != binary.LittleEndian.Uint32(src[ref:]) { // Hash collision on different matches.
- // si += acc >> skipStrength
- // acc++
- si++
- continue
+ uint32(match) != binary.LittleEndian.Uint32(src[ref:]) { // Hash collision on different matches.
+ // No match. Start calculating another hash.
+ // The processor can usually do this out-of-order.
+ h = blockHash(match >> 16)
+ ref = hashTable[h]
+
+ // Check the second match at si+1
+ si += 1
+ offset = si - ref2
+
+ if offset <= 0 || offset >= winSize ||
+ uint32(match>>8) != binary.LittleEndian.Uint32(src[ref2:]) {
+ // No match. Check the third match at si+2
+ si += 1
+ offset = si - ref
+ hashTable[h] = si
+
+ if offset <= 0 || offset >= winSize ||
+ uint32(match>>16) != binary.LittleEndian.Uint32(src[ref:]) {
+ // Skip one extra byte (at si+3) before we check 3 matches again.
+ si += 2 + (si-anchor)>>adaptSkipLog
+ continue
+ }
+ }
}
// Match found.
- // acc = accInit
lLen := si - anchor // Literal length.
+ // We already matched 4 bytes.
+ mLen := 4
- // Encode match length part 1.
- si += minMatch
- mLen := si // Match length has minMatch already.
- // Find the longest match, first looking by batches of 8 bytes.
- for si < sn && binary.LittleEndian.Uint64(src[si:]) == binary.LittleEndian.Uint64(src[si-offset:]) {
- si += 8
+ // Extend backwards if we can, reducing literals.
+ tOff := si - offset - 1
+ for lLen > 0 && tOff >= 0 && src[si-1] == src[tOff] {
+ si--
+ tOff--
+ lLen--
+ mLen++
}
- // Then byte by byte.
- for si < sn && src[si] == src[si-offset] {
- si++
+
+ // Add the match length, so we continue search at the end.
+ // Use mLen to store the offset base.
+ si, mLen = si+mLen, si+minMatch
+
+ // Find the longest match by looking by batches of 8 bytes.
+ for si+8 < sn {
+ x := binary.LittleEndian.Uint64(src[si:]) ^ binary.LittleEndian.Uint64(src[si-offset:])
+ if x == 0 {
+ si += 8
+ } else {
+ // Stop is first non-zero byte.
+ si += bits.TrailingZeros64(x) >> 3
+ break
+ }
}
mLen = si - mLen
@@ -145,9 +186,17 @@
dst[di] = byte(mLen)
di++
}
+ // Check if we can load next values.
+ if si >= sn {
+ break
+ }
+ // Hash match end-2
+ h = blockHash(binary.LittleEndian.Uint64(src[si-2:]))
+ hashTable[h] = si - 2
}
- if anchor == 0 {
+lastLiterals:
+ if isNotCompressible && anchor == 0 {
// Incompressible.
return 0, nil
}
@@ -168,7 +217,7 @@
di++
// Write the last literals.
- if di >= anchor {
+ if isNotCompressible && di >= anchor {
// Incompressible.
return 0, nil
}
@@ -176,40 +225,60 @@
return di, nil
}
+// Pool of hash tables for CompressBlock.
+var htPool = sync.Pool{
+ New: func() interface{} {
+ return new([htSize]int)
+ },
+}
+
+// blockHash hashes 4 bytes into a value < winSize.
+func blockHashHC(x uint32) uint32 {
+ const hasher uint32 = 2654435761 // Knuth multiplicative hash.
+ return x * hasher >> (32 - winSizeLog)
+}
+
// CompressBlockHC compresses the source buffer src into the destination dst
// with max search depth (use 0 or negative value for no max).
//
// CompressBlockHC compression ratio is better than CompressBlock but it is also slower.
//
-// The size of the compressed data is returned. If it is 0 and no error, then the data is not compressible.
+// The size of the compressed data is returned.
+//
+// If the destination buffer size is lower than CompressBlockBound and
+// the compressed size is 0 and no error, then the data is incompressible.
//
// An error is returned if the destination buffer is too small.
-func CompressBlockHC(src, dst []byte, depth int) (di int, err error) {
- defer func() {
- if recover() != nil {
- err = ErrInvalidSourceShortBuffer
- }
- }()
+func CompressBlockHC(src, dst []byte, depth int) (_ int, err error) {
+ defer recoverBlock(&err)
- sn, dn := len(src)-mfLimit, len(dst)
- if sn <= 0 || dn == 0 {
- return 0, nil
- }
- var si int
+ // Return 0, nil only if the destination buffer size is < CompressBlockBound.
+ isNotCompressible := len(dst) < CompressBlockBound(len(src))
+
+ // adaptSkipLog sets how quickly the compressor begins skipping blocks when data is incompressible.
+ // This significantly speeds up incompressible data and usually has very small impact on compression.
+ // bytes to skip = 1 + (bytes since last match >> adaptSkipLog)
+ const adaptSkipLog = 7
+
+ var si, di, anchor int
// hashTable: stores the last position found for a given hash
- // chaingTable: stores previous positions for a given hash
+ // chainTable: stores previous positions for a given hash
var hashTable, chainTable [winSize]int
if depth <= 0 {
depth = winSize
}
- anchor := si
+ sn := len(src) - mfLimit
+ if sn <= 0 {
+ goto lastLiterals
+ }
+
for si < sn {
// Hash the next 4 bytes (sequence).
match := binary.LittleEndian.Uint32(src[si:])
- h := blockHash(match)
+ h := blockHashHC(match)
// Follow the chain until out of window and give the longest match.
mLen := 0
@@ -222,11 +291,15 @@
}
ml := 0
// Compare the current position with a previous with the same hash.
- for ml < sn-si && binary.LittleEndian.Uint64(src[next+ml:]) == binary.LittleEndian.Uint64(src[si+ml:]) {
- ml += 8
- }
- for ml < sn-si && src[next+ml] == src[si+ml] {
- ml++
+ for ml < sn-si {
+ x := binary.LittleEndian.Uint64(src[next+ml:]) ^ binary.LittleEndian.Uint64(src[si+ml:])
+ if x == 0 {
+ ml += 8
+ } else {
+ // Stop is first non-zero byte.
+ ml += bits.TrailingZeros64(x) >> 3
+ break
+ }
}
if ml < minMatch || ml <= mLen {
// Match too small (<minMath) or smaller than the current match.
@@ -243,7 +316,7 @@
// No match found.
if mLen == 0 {
- si++
+ si += 1 + (si-anchor)>>adaptSkipLog
continue
}
@@ -257,7 +330,7 @@
for si, ml := winStart, si+mLen; si < ml; {
match >>= 8
match |= uint32(src[si+3]) << 24
- h := blockHash(match)
+ h := blockHashHC(match)
chainTable[si&winMask] = hashTable[h]
hashTable[h] = si
si++
@@ -308,12 +381,13 @@
}
}
- if anchor == 0 {
+ if isNotCompressible && anchor == 0 {
// Incompressible.
return 0, nil
}
// Last literals.
+lastLiterals:
lLen := len(src) - anchor
if lLen < 0xF {
dst[di] = byte(lLen << 4)
@@ -330,7 +404,7 @@
di++
// Write the last literals.
- if di >= anchor {
+ if isNotCompressible && di >= anchor {
// Incompressible.
return 0, nil
}
diff --git a/vendor/github.com/pierrec/lz4/decode_other.go b/vendor/github.com/pierrec/lz4/decode_other.go
index b83a19a..919888e 100644
--- a/vendor/github.com/pierrec/lz4/decode_other.go
+++ b/vendor/github.com/pierrec/lz4/decode_other.go
@@ -3,11 +3,10 @@
package lz4
func decodeBlock(dst, src []byte) (ret int) {
+ const hasError = -2
defer func() {
- // It is now faster to let the runtime panic and recover on out of bound slice access
- // than checking indices as we go along.
if recover() != nil {
- ret = -2
+ ret = hasError
}
}()
@@ -20,7 +19,7 @@
// Literals.
if lLen := b >> 4; lLen > 0 {
switch {
- case lLen < 0xF && di+18 < len(dst) && si+16 < len(src):
+ case lLen < 0xF && si+16 < len(src):
// Shortcut 1
// if we have enough room in src and dst, and the literals length
// is small enough (0..14) then copy all 16 bytes, even if not all
@@ -35,7 +34,13 @@
mLen += 4
if offset := int(src[si]) | int(src[si+1])<<8; mLen <= offset {
i := di - offset
- copy(dst[di:], dst[i:i+18])
+ end := i + 18
+ if end > len(dst) {
+ // The remaining buffer may not hold 18 bytes.
+ // See https://github.com/pierrec/lz4/issues/51.
+ end = len(dst)
+ }
+ copy(dst[di:], dst[i:end])
si += 2
di += mLen
continue
@@ -61,7 +66,7 @@
offset := int(src[si]) | int(src[si+1])<<8
if offset == 0 {
- return -2
+ return hasError
}
si += 2
@@ -90,6 +95,4 @@
}
di += copy(dst[di:di+mLen], expanded[:mLen])
}
-
- return di
}
diff --git a/vendor/github.com/pierrec/lz4/errors.go b/vendor/github.com/pierrec/lz4/errors.go
new file mode 100644
index 0000000..1c45d18
--- /dev/null
+++ b/vendor/github.com/pierrec/lz4/errors.go
@@ -0,0 +1,30 @@
+package lz4
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ rdebug "runtime/debug"
+)
+
+var (
+ // ErrInvalidSourceShortBuffer is returned by UncompressBlock or CompressBLock when a compressed
+ // block is corrupted or the destination buffer is not large enough for the uncompressed data.
+ ErrInvalidSourceShortBuffer = errors.New("lz4: invalid source or destination buffer too short")
+ // ErrInvalid is returned when reading an invalid LZ4 archive.
+ ErrInvalid = errors.New("lz4: bad magic number")
+ // ErrBlockDependency is returned when attempting to decompress an archive created with block dependency.
+ ErrBlockDependency = errors.New("lz4: block dependency not supported")
+ // ErrUnsupportedSeek is returned when attempting to Seek any way but forward from the current position.
+ ErrUnsupportedSeek = errors.New("lz4: can only seek forward from io.SeekCurrent")
+)
+
+func recoverBlock(e *error) {
+ if r := recover(); r != nil && *e == nil {
+ if debugFlag {
+ fmt.Fprintln(os.Stderr, r)
+ rdebug.PrintStack()
+ }
+ *e = ErrInvalidSourceShortBuffer
+ }
+}
diff --git a/vendor/github.com/pierrec/lz4/go.mod b/vendor/github.com/pierrec/lz4/go.mod
deleted file mode 100644
index f9f570a..0000000
--- a/vendor/github.com/pierrec/lz4/go.mod
+++ /dev/null
@@ -1,3 +0,0 @@
-module github.com/pierrec/lz4
-
-require github.com/pkg/profile v1.2.1
diff --git a/vendor/github.com/pierrec/lz4/go.sum b/vendor/github.com/pierrec/lz4/go.sum
deleted file mode 100644
index 6ca7598..0000000
--- a/vendor/github.com/pierrec/lz4/go.sum
+++ /dev/null
@@ -1,2 +0,0 @@
-github.com/pkg/profile v1.2.1 h1:F++O52m40owAmADcojzM+9gyjmMOY/T4oYJkgFDH8RE=
-github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
diff --git a/vendor/github.com/pierrec/lz4/internal/xxh32/xxh32zero.go b/vendor/github.com/pierrec/lz4/internal/xxh32/xxh32zero.go
index 850a6fd..7a76a6b 100644
--- a/vendor/github.com/pierrec/lz4/internal/xxh32/xxh32zero.go
+++ b/vendor/github.com/pierrec/lz4/internal/xxh32/xxh32zero.go
@@ -7,14 +7,15 @@
)
const (
- prime32_1 uint32 = 2654435761
- prime32_2 uint32 = 2246822519
- prime32_3 uint32 = 3266489917
- prime32_4 uint32 = 668265263
- prime32_5 uint32 = 374761393
+ prime1 uint32 = 2654435761
+ prime2 uint32 = 2246822519
+ prime3 uint32 = 3266489917
+ prime4 uint32 = 668265263
+ prime5 uint32 = 374761393
- prime32_1plus2 uint32 = 606290984
- prime32_minus1 uint32 = 1640531535
+ primeMask = 0xFFFFFFFF
+ prime1plus2 = uint32((uint64(prime1) + uint64(prime2)) & primeMask) // 606290984
+ prime1minus = uint32((-int64(prime1)) & primeMask) // 1640531535
)
// XXHZero represents an xxhash32 object with seed 0.
@@ -37,10 +38,10 @@
// Reset resets the Hash to its initial state.
func (xxh *XXHZero) Reset() {
- xxh.v1 = prime32_1plus2
- xxh.v2 = prime32_2
+ xxh.v1 = prime1plus2
+ xxh.v2 = prime2
xxh.v3 = 0
- xxh.v4 = prime32_minus1
+ xxh.v4 = prime1minus
xxh.totalLen = 0
xxh.bufused = 0
}
@@ -83,20 +84,20 @@
// fast rotl(13)
buf := xxh.buf[:16] // BCE hint.
- v1 = rol13(v1+binary.LittleEndian.Uint32(buf[:])*prime32_2) * prime32_1
- v2 = rol13(v2+binary.LittleEndian.Uint32(buf[4:])*prime32_2) * prime32_1
- v3 = rol13(v3+binary.LittleEndian.Uint32(buf[8:])*prime32_2) * prime32_1
- v4 = rol13(v4+binary.LittleEndian.Uint32(buf[12:])*prime32_2) * prime32_1
+ v1 = rol13(v1+binary.LittleEndian.Uint32(buf[:])*prime2) * prime1
+ v2 = rol13(v2+binary.LittleEndian.Uint32(buf[4:])*prime2) * prime1
+ v3 = rol13(v3+binary.LittleEndian.Uint32(buf[8:])*prime2) * prime1
+ v4 = rol13(v4+binary.LittleEndian.Uint32(buf[12:])*prime2) * prime1
p = r
xxh.bufused = 0
}
for n := n - 16; p <= n; p += 16 {
sub := input[p:][:16] //BCE hint for compiler
- v1 = rol13(v1+binary.LittleEndian.Uint32(sub[:])*prime32_2) * prime32_1
- v2 = rol13(v2+binary.LittleEndian.Uint32(sub[4:])*prime32_2) * prime32_1
- v3 = rol13(v3+binary.LittleEndian.Uint32(sub[8:])*prime32_2) * prime32_1
- v4 = rol13(v4+binary.LittleEndian.Uint32(sub[12:])*prime32_2) * prime32_1
+ v1 = rol13(v1+binary.LittleEndian.Uint32(sub[:])*prime2) * prime1
+ v2 = rol13(v2+binary.LittleEndian.Uint32(sub[4:])*prime2) * prime1
+ v3 = rol13(v3+binary.LittleEndian.Uint32(sub[8:])*prime2) * prime1
+ v4 = rol13(v4+binary.LittleEndian.Uint32(sub[12:])*prime2) * prime1
}
xxh.v1, xxh.v2, xxh.v3, xxh.v4 = v1, v2, v3, v4
@@ -112,25 +113,25 @@
if h32 >= 16 {
h32 += rol1(xxh.v1) + rol7(xxh.v2) + rol12(xxh.v3) + rol18(xxh.v4)
} else {
- h32 += prime32_5
+ h32 += prime5
}
p := 0
n := xxh.bufused
buf := xxh.buf
for n := n - 4; p <= n; p += 4 {
- h32 += binary.LittleEndian.Uint32(buf[p:p+4]) * prime32_3
- h32 = rol17(h32) * prime32_4
+ h32 += binary.LittleEndian.Uint32(buf[p:p+4]) * prime3
+ h32 = rol17(h32) * prime4
}
for ; p < n; p++ {
- h32 += uint32(buf[p]) * prime32_5
- h32 = rol11(h32) * prime32_1
+ h32 += uint32(buf[p]) * prime5
+ h32 = rol11(h32) * prime1
}
h32 ^= h32 >> 15
- h32 *= prime32_2
+ h32 *= prime2
h32 ^= h32 >> 13
- h32 *= prime32_3
+ h32 *= prime3
h32 ^= h32 >> 16
return h32
@@ -142,19 +143,19 @@
h32 := uint32(n)
if n < 16 {
- h32 += prime32_5
+ h32 += prime5
} else {
- v1 := prime32_1plus2
- v2 := prime32_2
+ v1 := prime1plus2
+ v2 := prime2
v3 := uint32(0)
- v4 := prime32_minus1
+ v4 := prime1minus
p := 0
for n := n - 16; p <= n; p += 16 {
sub := input[p:][:16] //BCE hint for compiler
- v1 = rol13(v1+binary.LittleEndian.Uint32(sub[:])*prime32_2) * prime32_1
- v2 = rol13(v2+binary.LittleEndian.Uint32(sub[4:])*prime32_2) * prime32_1
- v3 = rol13(v3+binary.LittleEndian.Uint32(sub[8:])*prime32_2) * prime32_1
- v4 = rol13(v4+binary.LittleEndian.Uint32(sub[12:])*prime32_2) * prime32_1
+ v1 = rol13(v1+binary.LittleEndian.Uint32(sub[:])*prime2) * prime1
+ v2 = rol13(v2+binary.LittleEndian.Uint32(sub[4:])*prime2) * prime1
+ v3 = rol13(v3+binary.LittleEndian.Uint32(sub[8:])*prime2) * prime1
+ v4 = rol13(v4+binary.LittleEndian.Uint32(sub[12:])*prime2) * prime1
}
input = input[p:]
n -= p
@@ -163,19 +164,19 @@
p := 0
for n := n - 4; p <= n; p += 4 {
- h32 += binary.LittleEndian.Uint32(input[p:p+4]) * prime32_3
- h32 = rol17(h32) * prime32_4
+ h32 += binary.LittleEndian.Uint32(input[p:p+4]) * prime3
+ h32 = rol17(h32) * prime4
}
for p < n {
- h32 += uint32(input[p]) * prime32_5
- h32 = rol11(h32) * prime32_1
+ h32 += uint32(input[p]) * prime5
+ h32 = rol11(h32) * prime1
p++
}
h32 ^= h32 >> 15
- h32 *= prime32_2
+ h32 *= prime2
h32 ^= h32 >> 13
- h32 *= prime32_3
+ h32 *= prime3
h32 ^= h32 >> 16
return h32
@@ -183,12 +184,12 @@
// Uint32Zero hashes x with seed 0.
func Uint32Zero(x uint32) uint32 {
- h := prime32_5 + 4 + x*prime32_3
- h = rol17(h) * prime32_4
+ h := prime5 + 4 + x*prime3
+ h = rol17(h) * prime4
h ^= h >> 15
- h *= prime32_2
+ h *= prime2
h ^= h >> 13
- h *= prime32_3
+ h *= prime3
h ^= h >> 16
return h
}
diff --git a/vendor/github.com/pierrec/lz4/lz4.go b/vendor/github.com/pierrec/lz4/lz4.go
index 3580275..a3284bd 100644
--- a/vendor/github.com/pierrec/lz4/lz4.go
+++ b/vendor/github.com/pierrec/lz4/lz4.go
@@ -10,14 +10,20 @@
//
package lz4
+import (
+ "math/bits"
+ "sync"
+)
+
const (
// Extension is the LZ4 frame file name extension
Extension = ".lz4"
// Version is the LZ4 frame format version
Version = 1
- frameMagic uint32 = 0x184D2204
- frameSkipMagic uint32 = 0x184D2A50
+ frameMagic uint32 = 0x184D2204
+ frameSkipMagic uint32 = 0x184D2A50
+ frameMagicLegacy uint32 = 0x184C2102
// The following constants are used to setup the compression algorithm.
minMatch = 4 // the minimum size of the match sequence size (4 bytes)
@@ -30,26 +36,63 @@
// hashLog determines the size of the hash table used to quickly find a previous match position.
// Its value influences the compression speed and memory usage, the lower the faster,
// but at the expense of the compression ratio.
- // 16 seems to be the best compromise.
- hashLog = 16
- hashTableSize = 1 << hashLog
- hashShift = uint((minMatch * 8) - hashLog)
+ // 16 seems to be the best compromise for fast compression.
+ hashLog = 16
+ htSize = 1 << hashLog
- mfLimit = 8 + minMatch // The last match cannot start within the last 12 bytes.
- skipStrength = 6 // variable step for fast scan
+ mfLimit = 10 + minMatch // The last match cannot start within the last 14 bytes.
)
// map the block max size id with its value in bytes: 64Kb, 256Kb, 1Mb and 4Mb.
-var (
- bsMapID = map[byte]int{4: 64 << 10, 5: 256 << 10, 6: 1 << 20, 7: 4 << 20}
- bsMapValue = make(map[int]byte, len(bsMapID))
+const (
+ blockSize64K = 1 << (16 + 2*iota)
+ blockSize256K
+ blockSize1M
+ blockSize4M
)
-// Reversed.
-func init() {
- for i, v := range bsMapID {
- bsMapValue[v] = i
+var (
+ // Keep a pool of buffers for each valid block sizes.
+ bsMapValue = [...]*sync.Pool{
+ newBufferPool(2 * blockSize64K),
+ newBufferPool(2 * blockSize256K),
+ newBufferPool(2 * blockSize1M),
+ newBufferPool(2 * blockSize4M),
}
+)
+
+// newBufferPool returns a pool for buffers of the given size.
+func newBufferPool(size int) *sync.Pool {
+ return &sync.Pool{
+ New: func() interface{} {
+ return make([]byte, size)
+ },
+ }
+}
+
+// getBuffer returns a buffer to its pool.
+func getBuffer(size int) []byte {
+ idx := blockSizeValueToIndex(size) - 4
+ return bsMapValue[idx].Get().([]byte)
+}
+
+// putBuffer returns a buffer to its pool.
+func putBuffer(size int, buf []byte) {
+ if cap(buf) > 0 {
+ idx := blockSizeValueToIndex(size) - 4
+ bsMapValue[idx].Put(buf[:cap(buf)])
+ }
+}
+func blockSizeIndexToValue(i byte) int {
+ return 1 << (16 + 2*uint(i))
+}
+func isValidBlockSize(size int) bool {
+ const blockSizeMask = blockSize64K | blockSize256K | blockSize1M | blockSize4M
+
+ return size&blockSizeMask > 0 && bits.OnesCount(uint(size)) == 1
+}
+func blockSizeValueToIndex(size int) byte {
+ return 4 + byte(bits.TrailingZeros(uint(size)>>16)/2)
}
// Header describes the various flags that can be set on a Writer or obtained from a Reader.
@@ -57,7 +100,7 @@
// (http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html).
//
// NB. in a Reader, in case of concatenated frames, the Header values may change between Read() calls.
-// It is the caller responsibility to check them if necessary.
+// It is the caller's responsibility to check them if necessary.
type Header struct {
BlockChecksum bool // Compressed blocks checksum flag.
NoChecksum bool // Frame checksum flag.
@@ -66,3 +109,8 @@
CompressionLevel int // Compression level (higher is better, use 0 for fastest compression).
done bool // Header processed flag (Read or Write and checked).
}
+
+// Reset reset internal status
+func (h *Header) Reset() {
+ h.done = false
+}
diff --git a/vendor/github.com/pierrec/lz4/reader.go b/vendor/github.com/pierrec/lz4/reader.go
index 81efdbf..87dd72b 100644
--- a/vendor/github.com/pierrec/lz4/reader.go
+++ b/vendor/github.com/pierrec/lz4/reader.go
@@ -14,6 +14,9 @@
// The Header may change between Read() calls in case of concatenated frames.
type Reader struct {
Header
+ // Handler called when a block has been successfully read.
+ // It provides the number of bytes read.
+ OnBlockDone func(size int)
buf [8]byte // Scrap buffer.
pos int64 // Current position in src.
@@ -22,6 +25,8 @@
data []byte // Uncompressed data.
idx int // Index of unread bytes into data.
checksum xxh32.XXHZero // Frame hash.
+ skip int64 // Bytes to skip before next read.
+ dpos int64 // Position in dest
}
// NewReader returns a new LZ4 frame decoder.
@@ -76,17 +81,17 @@
return fmt.Errorf("lz4: invalid version: got %d; expected %d", v, Version)
}
if b>>5&1 == 0 {
- return fmt.Errorf("lz4: block dependency not supported")
+ return ErrBlockDependency
}
z.BlockChecksum = b>>4&1 > 0
frameSize := b>>3&1 > 0
z.NoChecksum = b>>2&1 == 0
bmsID := buf[1] >> 4 & 0x7
- bSize, ok := bsMapID[bmsID]
- if !ok {
+ if bmsID < 4 || bmsID > 7 {
return fmt.Errorf("lz4: invalid block max size ID: %d", bmsID)
}
+ bSize := blockSizeIndexToValue(bmsID - 4)
z.BlockMaxSize = bSize
// Allocate the compressed/uncompressed buffers.
@@ -101,7 +106,7 @@
z.data = z.zdata[:cap(z.zdata)][bSize:]
z.idx = len(z.data)
- z.checksum.Write(buf[0:2])
+ _, _ = z.checksum.Write(buf[0:2])
if frameSize {
buf := buf[:8]
@@ -110,7 +115,7 @@
}
z.Size = binary.LittleEndian.Uint64(buf)
z.pos += 8
- z.checksum.Write(buf)
+ _, _ = z.checksum.Write(buf)
}
// Header checksum.
@@ -211,6 +216,9 @@
return 0, err
}
z.pos += int64(bLen)
+ if z.OnBlockDone != nil {
+ z.OnBlockDone(int(bLen))
+ }
if z.BlockChecksum {
checksum, err := z.readUint32()
@@ -255,10 +263,13 @@
return 0, err
}
z.data = z.data[:n]
+ if z.OnBlockDone != nil {
+ z.OnBlockDone(n)
+ }
}
if !z.NoChecksum {
- z.checksum.Write(z.data)
+ _, _ = z.checksum.Write(z.data)
if debugFlag {
debug("current frame checksum %x", z.checksum.Sum32())
}
@@ -266,8 +277,20 @@
z.idx = 0
}
+ if z.skip > int64(len(z.data[z.idx:])) {
+ z.skip -= int64(len(z.data[z.idx:]))
+ z.dpos += int64(len(z.data[z.idx:]))
+ z.idx = len(z.data)
+ return 0, nil
+ }
+
+ z.idx += int(z.skip)
+ z.dpos += z.skip
+ z.skip = 0
+
n := copy(buf, z.data[z.idx:])
z.idx += n
+ z.dpos += int64(n)
if debugFlag {
debug("copied %d bytes to input", n)
}
@@ -275,6 +298,20 @@
return n, nil
}
+// Seek implements io.Seeker, but supports seeking forward from the current
+// position only. Any other seek will return an error. Allows skipping output
+// bytes which aren't needed, which in some scenarios is faster than reading
+// and discarding them.
+// Note this may cause future calls to Read() to read 0 bytes if all of the
+// data they would have returned is skipped.
+func (z *Reader) Seek(offset int64, whence int) (int64, error) {
+ if offset < 0 || whence != io.SeekCurrent {
+ return z.dpos + z.skip, ErrUnsupportedSeek
+ }
+ z.skip += offset
+ return z.dpos + z.skip, nil
+}
+
// Reset discards the Reader's state and makes it equivalent to the
// result of its original state from NewReader, but reading from r instead.
// This permits reusing a Reader rather than allocating a new one.
diff --git a/vendor/github.com/pierrec/lz4/reader_legacy.go b/vendor/github.com/pierrec/lz4/reader_legacy.go
new file mode 100644
index 0000000..1670a77
--- /dev/null
+++ b/vendor/github.com/pierrec/lz4/reader_legacy.go
@@ -0,0 +1,207 @@
+package lz4
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+)
+
+// ReaderLegacy implements the LZ4Demo frame decoder.
+// The Header is set after the first call to Read().
+type ReaderLegacy struct {
+ Header
+ // Handler called when a block has been successfully read.
+ // It provides the number of bytes read.
+ OnBlockDone func(size int)
+
+ lastBlock bool
+ buf [8]byte // Scrap buffer.
+ pos int64 // Current position in src.
+ src io.Reader // Source.
+ zdata []byte // Compressed data.
+ data []byte // Uncompressed data.
+ idx int // Index of unread bytes into data.
+ skip int64 // Bytes to skip before next read.
+ dpos int64 // Position in dest
+}
+
+// NewReaderLegacy returns a new LZ4Demo frame decoder.
+// No access to the underlying io.Reader is performed.
+func NewReaderLegacy(src io.Reader) *ReaderLegacy {
+ r := &ReaderLegacy{src: src}
+ return r
+}
+
+// readHeader checks the frame magic number and parses the frame descriptoz.
+// Skippable frames are supported even as a first frame although the LZ4
+// specifications recommends skippable frames not to be used as first frames.
+func (z *ReaderLegacy) readLegacyHeader() error {
+ z.lastBlock = false
+ magic, err := z.readUint32()
+ if err != nil {
+ z.pos += 4
+ if err == io.ErrUnexpectedEOF {
+ return io.EOF
+ }
+ return err
+ }
+ if magic != frameMagicLegacy {
+ return ErrInvalid
+ }
+ z.pos += 4
+
+ // Legacy has fixed 8MB blocksizes
+ // https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md#legacy-frame
+ bSize := blockSize4M * 2
+
+ // Allocate the compressed/uncompressed buffers.
+ // The compressed buffer cannot exceed the uncompressed one.
+ if n := 2 * bSize; cap(z.zdata) < n {
+ z.zdata = make([]byte, n, n)
+ }
+ if debugFlag {
+ debug("header block max size size=%d", bSize)
+ }
+ z.zdata = z.zdata[:bSize]
+ z.data = z.zdata[:cap(z.zdata)][bSize:]
+ z.idx = len(z.data)
+
+ z.Header.done = true
+ if debugFlag {
+ debug("header read: %v", z.Header)
+ }
+
+ return nil
+}
+
+// Read decompresses data from the underlying source into the supplied buffer.
+//
+// Since there can be multiple streams concatenated, Header values may
+// change between calls to Read(). If that is the case, no data is actually read from
+// the underlying io.Reader, to allow for potential input buffer resizing.
+func (z *ReaderLegacy) Read(buf []byte) (int, error) {
+ if debugFlag {
+ debug("Read buf len=%d", len(buf))
+ }
+ if !z.Header.done {
+ if err := z.readLegacyHeader(); err != nil {
+ return 0, err
+ }
+ if debugFlag {
+ debug("header read OK compressed buffer %d / %d uncompressed buffer %d : %d index=%d",
+ len(z.zdata), cap(z.zdata), len(z.data), cap(z.data), z.idx)
+ }
+ }
+
+ if len(buf) == 0 {
+ return 0, nil
+ }
+
+ if z.idx == len(z.data) {
+ // No data ready for reading, process the next block.
+ if debugFlag {
+ debug(" reading block from writer %d %d", z.idx, blockSize4M*2)
+ }
+
+ // Reset uncompressed buffer
+ z.data = z.zdata[:cap(z.zdata)][len(z.zdata):]
+
+ bLen, err := z.readUint32()
+ if err != nil {
+ return 0, err
+ }
+ if debugFlag {
+ debug(" bLen %d (0x%x) offset = %d (0x%x)", bLen, bLen, z.pos, z.pos)
+ }
+ z.pos += 4
+
+ // Legacy blocks are always compressed, even when detrimental
+ if debugFlag {
+ debug(" compressed block size %d", bLen)
+ }
+
+ if int(bLen) > cap(z.data) {
+ return 0, fmt.Errorf("lz4: invalid block size: %d", bLen)
+ }
+ zdata := z.zdata[:bLen]
+ if _, err := io.ReadFull(z.src, zdata); err != nil {
+ return 0, err
+ }
+ z.pos += int64(bLen)
+
+ n, err := UncompressBlock(zdata, z.data)
+ if err != nil {
+ return 0, err
+ }
+
+ z.data = z.data[:n]
+ if z.OnBlockDone != nil {
+ z.OnBlockDone(n)
+ }
+
+ z.idx = 0
+
+ // Legacy blocks are fixed to 8MB, if we read a decompressed block smaller than this
+ // it means we've reached the end...
+ if n < blockSize4M*2 {
+ z.lastBlock = true
+ }
+ }
+
+ if z.skip > int64(len(z.data[z.idx:])) {
+ z.skip -= int64(len(z.data[z.idx:]))
+ z.dpos += int64(len(z.data[z.idx:]))
+ z.idx = len(z.data)
+ return 0, nil
+ }
+
+ z.idx += int(z.skip)
+ z.dpos += z.skip
+ z.skip = 0
+
+ n := copy(buf, z.data[z.idx:])
+ z.idx += n
+ z.dpos += int64(n)
+ if debugFlag {
+ debug("%v] copied %d bytes to input (%d:%d)", z.lastBlock, n, z.idx, len(z.data))
+ }
+ if z.lastBlock && len(z.data) == z.idx {
+ return n, io.EOF
+ }
+ return n, nil
+}
+
+// Seek implements io.Seeker, but supports seeking forward from the current
+// position only. Any other seek will return an error. Allows skipping output
+// bytes which aren't needed, which in some scenarios is faster than reading
+// and discarding them.
+// Note this may cause future calls to Read() to read 0 bytes if all of the
+// data they would have returned is skipped.
+func (z *ReaderLegacy) Seek(offset int64, whence int) (int64, error) {
+ if offset < 0 || whence != io.SeekCurrent {
+ return z.dpos + z.skip, ErrUnsupportedSeek
+ }
+ z.skip += offset
+ return z.dpos + z.skip, nil
+}
+
+// Reset discards the Reader's state and makes it equivalent to the
+// result of its original state from NewReader, but reading from r instead.
+// This permits reusing a Reader rather than allocating a new one.
+func (z *ReaderLegacy) Reset(r io.Reader) {
+ z.Header = Header{}
+ z.pos = 0
+ z.src = r
+ z.zdata = z.zdata[:0]
+ z.data = z.data[:0]
+ z.idx = 0
+}
+
+// readUint32 reads an uint32 into the supplied buffer.
+// The idea is to make use of the already allocated buffers avoiding additional allocations.
+func (z *ReaderLegacy) readUint32() (uint32, error) {
+ buf := z.buf[:4]
+ _, err := io.ReadFull(z.src, buf)
+ x := binary.LittleEndian.Uint32(buf)
+ return x, err
+}
diff --git a/vendor/github.com/pierrec/lz4/writer.go b/vendor/github.com/pierrec/lz4/writer.go
index 0120438..f066d56 100644
--- a/vendor/github.com/pierrec/lz4/writer.go
+++ b/vendor/github.com/pierrec/lz4/writer.go
@@ -4,21 +4,35 @@
"encoding/binary"
"fmt"
"io"
+ "runtime"
"github.com/pierrec/lz4/internal/xxh32"
)
+// zResult contains the results of compressing a block.
+type zResult struct {
+ size uint32 // Block header
+ data []byte // Compressed data
+ checksum uint32 // Data checksum
+}
+
// Writer implements the LZ4 frame encoder.
type Writer struct {
Header
+ // Handler called when a block has been successfully written out.
+ // It provides the number of bytes written.
+ OnBlockDone func(size int)
buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
dst io.Writer // Destination.
checksum xxh32.XXHZero // Frame checksum.
- zdata []byte // Compressed data.
- data []byte // Data to be compressed.
+ data []byte // Data to be compressed + buffer for compressed data.
idx int // Index into data.
hashtable [winSize]int // Hash table used in CompressBlock().
+
+ // For concurrency.
+ c chan chan zResult // Channel for block compression goroutines and writer goroutine.
+ err error // Any error encountered while writing to the underlying destination.
}
// NewWriter returns a new LZ4 frame encoder.
@@ -26,28 +40,90 @@
// The supplied Header is checked at the first Write.
// It is ok to change it before the first Write but then not until a Reset() is performed.
func NewWriter(dst io.Writer) *Writer {
- return &Writer{dst: dst}
+ z := new(Writer)
+ z.Reset(dst)
+ return z
+}
+
+// WithConcurrency sets the number of concurrent go routines used for compression.
+// A negative value sets the concurrency to GOMAXPROCS.
+func (z *Writer) WithConcurrency(n int) *Writer {
+ switch {
+ case n == 0 || n == 1:
+ z.c = nil
+ return z
+ case n < 0:
+ n = runtime.GOMAXPROCS(0)
+ }
+ z.c = make(chan chan zResult, n)
+ // Writer goroutine managing concurrent block compression goroutines.
+ go func() {
+ // Process next block compression item.
+ for c := range z.c {
+ // Read the next compressed block result.
+ // Waiting here ensures that the blocks are output in the order they were sent.
+ // The incoming channel is always closed as it indicates to the caller that
+ // the block has been processed.
+ res := <-c
+ n := len(res.data)
+ if n == 0 {
+ // Notify the block compression routine that we are done with its result.
+ // This is used when a sentinel block is sent to terminate the compression.
+ close(c)
+ return
+ }
+ // Write the block.
+ if err := z.writeUint32(res.size); err != nil && z.err == nil {
+ z.err = err
+ }
+ if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
+ z.err = err
+ }
+ if z.BlockChecksum {
+ if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
+ z.err = err
+ }
+ }
+ // It is now safe to release the buffer as no longer in use by any goroutine.
+ putBuffer(cap(res.data), res.data)
+ if h := z.OnBlockDone; h != nil {
+ h(n)
+ }
+ close(c)
+ }
+ }()
+ return z
+}
+
+// newBuffers instantiates new buffers which size matches the one in Header.
+// The returned buffers are for decompression and compression respectively.
+func (z *Writer) newBuffers() {
+ bSize := z.Header.BlockMaxSize
+ buf := getBuffer(bSize)
+ z.data = buf[:bSize] // Uncompressed buffer is the first half.
+}
+
+// freeBuffers puts the writer's buffers back to the pool.
+func (z *Writer) freeBuffers() {
+ // Put the buffer back into the pool, if any.
+ putBuffer(z.Header.BlockMaxSize, z.data)
+ z.data = nil
}
// writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
func (z *Writer) writeHeader() error {
// Default to 4Mb if BlockMaxSize is not set.
if z.Header.BlockMaxSize == 0 {
- z.Header.BlockMaxSize = bsMapID[7]
+ z.Header.BlockMaxSize = blockSize4M
}
// The only option that needs to be validated.
bSize := z.Header.BlockMaxSize
- bSizeID, ok := bsMapValue[bSize]
- if !ok {
+ if !isValidBlockSize(z.Header.BlockMaxSize) {
return fmt.Errorf("lz4: invalid block max size: %d", bSize)
}
// Allocate the compressed/uncompressed buffers.
// The compressed buffer cannot exceed the uncompressed one.
- if n := 2 * bSize; cap(z.zdata) < n {
- z.zdata = make([]byte, n, n)
- }
- z.zdata = z.zdata[:bSize]
- z.data = z.zdata[:cap(z.zdata)][bSize:]
+ z.newBuffers()
z.idx = 0
// Size is optional.
@@ -67,7 +143,7 @@
flg |= 1 << 2
}
buf[4] = flg
- buf[5] = bSizeID << 4
+ buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4
// Current buffer size: magic(4) + flags(1) + block max size (1).
n := 6
@@ -147,28 +223,39 @@
// compressBlock compresses a block.
func (z *Writer) compressBlock(data []byte) error {
if !z.NoChecksum {
- z.checksum.Write(data)
+ _, _ = z.checksum.Write(data)
}
+ if z.c != nil {
+ c := make(chan zResult)
+ z.c <- c // Send now to guarantee order
+
+ // get a buffer from the pool and copy the data over
+ block := getBuffer(z.Header.BlockMaxSize)[:len(data)]
+ copy(block, data)
+
+ go writerCompressBlock(c, z.Header, block)
+ return nil
+ }
+
+ zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
// The compressed block size cannot exceed the input's.
var zn int
- var err error
if level := z.Header.CompressionLevel; level != 0 {
- zn, err = CompressBlockHC(data, z.zdata, level)
+ zn, _ = CompressBlockHC(data, zdata, level)
} else {
- zn, err = CompressBlock(data, z.zdata, z.hashtable[:])
+ zn, _ = CompressBlock(data, zdata, z.hashtable[:])
}
- var zdata []byte
var bLen uint32
if debugFlag {
debug("block compression %d => %d", len(data), zn)
}
- if err == nil && zn > 0 && zn < len(data) {
+ if zn > 0 && zn < len(data) {
// Compressible and compressed size smaller than uncompressed: ok!
bLen = uint32(zn)
- zdata = z.zdata[:zn]
+ zdata = zdata[:zn]
} else {
// Uncompressed block.
bLen = uint32(len(data)) | compressedBlockFlag
@@ -182,24 +269,26 @@
if err := z.writeUint32(bLen); err != nil {
return err
}
- if _, err := z.dst.Write(zdata); err != nil {
+ written, err := z.dst.Write(zdata)
+ if err != nil {
return err
}
+ if h := z.OnBlockDone; h != nil {
+ h(written)
+ }
- if z.BlockChecksum {
- checksum := xxh32.ChecksumZero(zdata)
+ if !z.BlockChecksum {
if debugFlag {
- debug("block checksum %x", checksum)
+ debug("current frame checksum %x", z.checksum.Sum32())
}
- if err := z.writeUint32(checksum); err != nil {
- return err
- }
+ return nil
}
+ checksum := xxh32.ChecksumZero(zdata)
if debugFlag {
- debug("current frame checksum %x", z.checksum.Sum32())
+ debug("block checksum %x", checksum)
+ defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }()
}
-
- return nil
+ return z.writeUint32(checksum)
}
// Flush flushes any pending compressed data to the underlying writer.
@@ -213,7 +302,35 @@
return nil
}
- return z.compressBlock(z.data[:z.idx])
+ data := getBuffer(z.Header.BlockMaxSize)[:len(z.data[:z.idx])]
+ copy(data, z.data[:z.idx])
+
+ z.idx = 0
+ if z.c == nil {
+ return z.compressBlock(data)
+ }
+ if !z.NoChecksum {
+ _, _ = z.checksum.Write(data)
+ }
+ c := make(chan zResult)
+ z.c <- c
+ writerCompressBlock(c, z.Header, data)
+ return nil
+}
+
+func (z *Writer) close() error {
+ if z.c == nil {
+ return nil
+ }
+ // Send a sentinel block (no data to compress) to terminate the writer main goroutine.
+ c := make(chan zResult)
+ z.c <- c
+ c <- zResult{}
+ // Wait for the main goroutine to complete.
+ <-c
+ // At this point the main goroutine has shut down or is about to return.
+ z.c = nil
+ return z.err
}
// Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
@@ -223,10 +340,13 @@
return err
}
}
-
if err := z.Flush(); err != nil {
return err
}
+ if err := z.close(); err != nil {
+ return err
+ }
+ z.freeBuffers()
if debugFlag {
debug("writing last empty block")
@@ -234,28 +354,33 @@
if err := z.writeUint32(0); err != nil {
return err
}
- if !z.NoChecksum {
- checksum := z.checksum.Sum32()
- if debugFlag {
- debug("stream checksum %x", checksum)
- }
- if err := z.writeUint32(checksum); err != nil {
- return err
- }
+ if z.NoChecksum {
+ return nil
}
- return nil
+ checksum := z.checksum.Sum32()
+ if debugFlag {
+ debug("stream checksum %x", checksum)
+ }
+ return z.writeUint32(checksum)
}
// Reset clears the state of the Writer z such that it is equivalent to its
// initial state from NewWriter, but instead writing to w.
// No access to the underlying io.Writer is performed.
func (z *Writer) Reset(w io.Writer) {
- z.Header = Header{}
+ n := cap(z.c)
+ _ = z.close()
+ z.freeBuffers()
+ z.Header.Reset()
z.dst = w
z.checksum.Reset()
- z.zdata = z.zdata[:0]
- z.data = z.data[:0]
z.idx = 0
+ z.err = nil
+ // reset hashtable to ensure deterministic output.
+ for i := range z.hashtable {
+ z.hashtable[i] = 0
+ }
+ z.WithConcurrency(n)
}
// writeUint32 writes a uint32 to the underlying writer.
@@ -265,3 +390,33 @@
_, err := z.dst.Write(buf)
return err
}
+
+// writerCompressBlock compresses data into a pooled buffer and writes its result
+// out to the input channel.
+func writerCompressBlock(c chan zResult, header Header, data []byte) {
+ zdata := getBuffer(header.BlockMaxSize)
+ // The compressed block size cannot exceed the input's.
+ var zn int
+ if level := header.CompressionLevel; level != 0 {
+ zn, _ = CompressBlockHC(data, zdata, level)
+ } else {
+ var hashTable [winSize]int
+ zn, _ = CompressBlock(data, zdata, hashTable[:])
+ }
+ var res zResult
+ if zn > 0 && zn < len(data) {
+ res.size = uint32(zn)
+ res.data = zdata[:zn]
+ // release the uncompressed block since it is not used anymore
+ putBuffer(header.BlockMaxSize, data)
+ } else {
+ res.size = uint32(len(data)) | compressedBlockFlag
+ res.data = data
+ // release the compressed block since it was not used
+ putBuffer(header.BlockMaxSize, zdata)
+ }
+ if header.BlockChecksum {
+ res.checksum = xxh32.ChecksumZero(res.data)
+ }
+ c <- res
+}
diff --git a/vendor/github.com/pierrec/lz4/writer_legacy.go b/vendor/github.com/pierrec/lz4/writer_legacy.go
new file mode 100644
index 0000000..ca8dc8c
--- /dev/null
+++ b/vendor/github.com/pierrec/lz4/writer_legacy.go
@@ -0,0 +1,182 @@
+package lz4
+
+import (
+ "encoding/binary"
+ "io"
+)
+
+// WriterLegacy implements the LZ4Demo frame decoder.
+type WriterLegacy struct {
+ Header
+ // Handler called when a block has been successfully read.
+ // It provides the number of bytes read.
+ OnBlockDone func(size int)
+
+ dst io.Writer // Destination.
+ data []byte // Data to be compressed + buffer for compressed data.
+ idx int // Index into data.
+ hashtable [winSize]int // Hash table used in CompressBlock().
+}
+
+// NewWriterLegacy returns a new LZ4 encoder for the legacy frame format.
+// No access to the underlying io.Writer is performed.
+// The supplied Header is checked at the first Write.
+// It is ok to change it before the first Write but then not until a Reset() is performed.
+func NewWriterLegacy(dst io.Writer) *WriterLegacy {
+ z := new(WriterLegacy)
+ z.Reset(dst)
+ return z
+}
+
+// Write compresses data from the supplied buffer into the underlying io.Writer.
+// Write does not return until the data has been written.
+func (z *WriterLegacy) Write(buf []byte) (int, error) {
+ if !z.Header.done {
+ if err := z.writeHeader(); err != nil {
+ return 0, err
+ }
+ }
+ if debugFlag {
+ debug("input buffer len=%d index=%d", len(buf), z.idx)
+ }
+
+ zn := len(z.data)
+ var n int
+ for len(buf) > 0 {
+ if z.idx == 0 && len(buf) >= zn {
+ // Avoid a copy as there is enough data for a block.
+ if err := z.compressBlock(buf[:zn]); err != nil {
+ return n, err
+ }
+ n += zn
+ buf = buf[zn:]
+ continue
+ }
+ // Accumulate the data to be compressed.
+ m := copy(z.data[z.idx:], buf)
+ n += m
+ z.idx += m
+ buf = buf[m:]
+ if debugFlag {
+ debug("%d bytes copied to buf, current index %d", n, z.idx)
+ }
+
+ if z.idx < len(z.data) {
+ // Buffer not filled.
+ if debugFlag {
+ debug("need more data for compression")
+ }
+ return n, nil
+ }
+
+ // Buffer full.
+ if err := z.compressBlock(z.data); err != nil {
+ return n, err
+ }
+ z.idx = 0
+ }
+
+ return n, nil
+}
+
+// writeHeader builds and writes the header to the underlying io.Writer.
+func (z *WriterLegacy) writeHeader() error {
+ // Legacy has fixed 8MB blocksizes
+ // https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md#legacy-frame
+ bSize := 2 * blockSize4M
+
+ buf := make([]byte, 2*bSize, 2*bSize)
+ z.data = buf[:bSize] // Uncompressed buffer is the first half.
+
+ z.idx = 0
+
+ // Header consists of one mageic number, write it out.
+ if err := binary.Write(z.dst, binary.LittleEndian, frameMagicLegacy); err != nil {
+ return err
+ }
+ z.Header.done = true
+ if debugFlag {
+ debug("wrote header %v", z.Header)
+ }
+
+ return nil
+}
+
+// compressBlock compresses a block.
+func (z *WriterLegacy) compressBlock(data []byte) error {
+ bSize := 2 * blockSize4M
+ zdata := z.data[bSize:cap(z.data)]
+ // The compressed block size cannot exceed the input's.
+ var zn int
+
+ if level := z.Header.CompressionLevel; level != 0 {
+ zn, _ = CompressBlockHC(data, zdata, level)
+ } else {
+ zn, _ = CompressBlock(data, zdata, z.hashtable[:])
+ }
+
+ if debugFlag {
+ debug("block compression %d => %d", len(data), zn)
+ }
+ zdata = zdata[:zn]
+
+ // Write the block.
+ if err := binary.Write(z.dst, binary.LittleEndian, uint32(zn)); err != nil {
+ return err
+ }
+ written, err := z.dst.Write(zdata)
+ if err != nil {
+ return err
+ }
+ if h := z.OnBlockDone; h != nil {
+ h(written)
+ }
+ return nil
+}
+
+// Flush flushes any pending compressed data to the underlying writer.
+// Flush does not return until the data has been written.
+// If the underlying writer returns an error, Flush returns that error.
+func (z *WriterLegacy) Flush() error {
+ if debugFlag {
+ debug("flush with index %d", z.idx)
+ }
+ if z.idx == 0 {
+ return nil
+ }
+
+ data := z.data[:z.idx]
+ z.idx = 0
+ return z.compressBlock(data)
+}
+
+// Close closes the WriterLegacy, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
+func (z *WriterLegacy) Close() error {
+ if !z.Header.done {
+ if err := z.writeHeader(); err != nil {
+ return err
+ }
+ }
+ if err := z.Flush(); err != nil {
+ return err
+ }
+
+ if debugFlag {
+ debug("writing last empty block")
+ }
+
+ return nil
+}
+
+// Reset clears the state of the WriterLegacy z such that it is equivalent to its
+// initial state from NewWriterLegacy, but instead writing to w.
+// No access to the underlying io.Writer is performed.
+func (z *WriterLegacy) Reset(w io.Writer) {
+ z.Header.Reset()
+ z.dst = w
+ z.idx = 0
+ // reset hashtable to ensure deterministic output.
+ for i := range z.hashtable {
+ z.hashtable[i] = 0
+ }
+}