aboutsummaryrefslogtreecommitdiff
path: root/compress.go
diff options
context:
space:
mode:
authorGravatar Aliaksandr Valialkin <valyala@gmail.com> 2017-05-17 14:35:37 +0300
committerGravatar Aliaksandr Valialkin <valyala@gmail.com> 2017-05-17 14:45:31 +0300
commit30e92af08fad62919ff2eae84f530a2f52a547c3 (patch)
tree5a565c9ac513e70971d495f5a32d1e3ac36bf91f /compress.go
parentclient: properly extract tls ServerName from address without port (diff)
downloadfasthttp-30e92af08fad62919ff2eae84f530a2f52a547c3.tar.gz
fasthttp-30e92af08fad62919ff2eae84f530a2f52a547c3.tar.bz2
fasthttp-30e92af08fad62919ff2eae84f530a2f52a547c3.zip
Limit heap memory usage when compressing high number of concurrent responses
Previously each concurrent compression could allocate huge compression state with the size up to 1Mb each. So 10K concurrent connections could result in 10Gb of compression state in the heap. This CL limits the number of compression states among concurrent requests when {Append,Write}{Gzip,Deflate}* functions are called to O(GOMAXPROCS). These functions are used by CompressHandler* for non-streaming responses, i.e. it should cover the majority of use cases. Memory usage for 10K concurrent connections that compress responses drops from 10Gb to 200Mb after this CL.
Diffstat (limited to 'compress.go')
-rw-r--r--compress.go287
1 files changed, 218 insertions, 69 deletions
diff --git a/compress.go b/compress.go
index b6869c6..221472a 100644
--- a/compress.go
+++ b/compress.go
@@ -1,6 +1,7 @@
package fasthttp
import (
+ "bytes"
"fmt"
"io"
"os"
@@ -9,6 +10,7 @@ import (
"github.com/klauspost/compress/flate"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zlib"
+ "github.com/valyala/bytebufferpool"
"github.com/valyala/fasthttp/stackless"
)
@@ -17,7 +19,8 @@ const (
CompressNoCompression = flate.NoCompression
CompressBestSpeed = flate.BestSpeed
CompressBestCompression = flate.BestCompression
- CompressDefaultCompression = flate.DefaultCompression
+ CompressDefaultCompression = 6 // flate.DefaultCompression
+ CompressHuffmanOnly = -2 // flate.HuffmanOnly
)
func acquireGzipReader(r io.Reader) (*gzip.Reader, error) {
@@ -70,51 +73,54 @@ func resetFlateReader(zr io.ReadCloser, r io.Reader) error {
var flateReaderPool sync.Pool
-func acquireGzipWriter(w io.Writer, level int) *gzipWriter {
- p := gzipWriterPoolMap[level]
- if p == nil {
- panic(fmt.Sprintf("BUG: unexpected compression level passed: %d. See compress/gzip for supported levels", level))
+func acquireStacklessGzipWriter(w io.Writer, level int) stackless.Writer {
+ nLevel := normalizeCompressLevel(level)
+ p := stacklessGzipWriterPoolMap[nLevel]
+ v := p.Get()
+ if v == nil {
+ return stackless.NewWriter(w, func(w io.Writer) stackless.Writer {
+ return acquireRealGzipWriter(w, level)
+ })
}
+ sw := v.(stackless.Writer)
+ sw.Reset(w)
+ return sw
+}
+
+func releaseStacklessGzipWriter(sw stackless.Writer, level int) {
+ sw.Close()
+ nLevel := normalizeCompressLevel(level)
+ p := stacklessGzipWriterPoolMap[nLevel]
+ p.Put(sw)
+}
+func acquireRealGzipWriter(w io.Writer, level int) *gzip.Writer {
+ nLevel := normalizeCompressLevel(level)
+ p := realGzipWriterPoolMap[nLevel]
v := p.Get()
if v == nil {
- sw := stackless.NewWriter(w, func(w io.Writer) stackless.Writer {
- zw, err := gzip.NewWriterLevel(w, level)
- if err != nil {
- panic(fmt.Sprintf("BUG: unexpected error from gzip.NewWriterLevel(%d): %s", level, err))
- }
- return zw
- })
- return &gzipWriter{
- Writer: sw,
- p: p,
+ zw, err := gzip.NewWriterLevel(w, level)
+ if err != nil {
+ panic(fmt.Sprintf("BUG: unexpected error from gzip.NewWriterLevel(%d): %s", level, err))
}
+ return zw
}
- zw := v.(*gzipWriter)
+ zw := v.(*gzip.Writer)
zw.Reset(w)
return zw
}
-func releaseGzipWriter(zw *gzipWriter) {
+func releaseRealGzipWriter(zw *gzip.Writer, level int) {
zw.Close()
- zw.p.Put(zw)
+ nLevel := normalizeCompressLevel(level)
+ p := realGzipWriterPoolMap[nLevel]
+ p.Put(zw)
}
-type gzipWriter struct {
- stackless.Writer
- p *sync.Pool
-}
-
-var gzipWriterPoolMap = func() map[int]*sync.Pool {
- // Initialize pools for all the compression levels defined
- // in https://golang.org/pkg/compress/gzip/#pkg-constants .
- m := make(map[int]*sync.Pool, 11)
- m[-1] = &sync.Pool{}
- for i := 0; i < 10; i++ {
- m[i] = &sync.Pool{}
- }
- return m
-}()
+var (
+ stacklessGzipWriterPoolMap = newCompressWriterPoolMap()
+ realGzipWriterPoolMap = newCompressWriterPoolMap()
+)
// AppendGzipBytesLevel appends gzipped src to dst using the given
// compression level and returns the resulting dst.
@@ -125,6 +131,7 @@ var gzipWriterPoolMap = func() map[int]*sync.Pool {
// * CompressBestSpeed
// * CompressBestCompression
// * CompressDefaultCompression
+// * CompressHuffmanOnly
func AppendGzipBytesLevel(dst, src []byte, level int) []byte {
w := &byteSliceWriter{dst}
WriteGzipLevel(w, src, level)
@@ -140,11 +147,41 @@ func AppendGzipBytesLevel(dst, src []byte, level int) []byte {
// * CompressBestSpeed
// * CompressBestCompression
// * CompressDefaultCompression
+// * CompressHuffmanOnly
func WriteGzipLevel(w io.Writer, p []byte, level int) (int, error) {
- zw := acquireGzipWriter(w, level)
- n, err := zw.Write(p)
- releaseGzipWriter(zw)
- return n, err
+ switch w.(type) {
+ case *byteSliceWriter,
+ *bytes.Buffer,
+ *ByteBuffer,
+ *bytebufferpool.ByteBuffer:
+ // These writers don't block, so we can just use stacklessWriteGzip
+ ctx := &compressCtx{
+ w: w,
+ p: p,
+ level: level,
+ }
+ stacklessWriteGzip(ctx)
+ return len(p), nil
+ default:
+ zw := acquireStacklessGzipWriter(w, level)
+ n, err := zw.Write(p)
+ releaseStacklessGzipWriter(zw, level)
+ return n, err
+ }
+}
+
+var stacklessWriteGzip = stackless.NewFunc(nonblockingWriteGzip)
+
+func nonblockingWriteGzip(ctxv interface{}) {
+ ctx := ctxv.(*compressCtx)
+ zw := acquireRealGzipWriter(ctx.w, ctx.level)
+
+ _, err := zw.Write(ctx.p)
+ if err != nil {
+ panic(fmt.Sprintf("BUG: gzip.Writer.Write for len(p)=%d returned unexpected error: %s", len(ctx.p), err))
+ }
+
+ releaseRealGzipWriter(zw, ctx.level)
}
// WriteGzip writes gzipped p to w and returns the number of compressed
@@ -175,6 +212,92 @@ func WriteGunzip(w io.Writer, p []byte) (int, error) {
return nn, err
}
+// AppendGunzipBytes appends gunzipped src to dst and returns the resulting dst.
+func AppendGunzipBytes(dst, src []byte) ([]byte, error) {
+ w := &byteSliceWriter{dst}
+ _, err := WriteGunzip(w, src)
+ return w.b, err
+}
+
+// AppendDeflateBytesLevel appends deflated src to dst using the given
+// compression level and returns the resulting dst.
+//
+// Supported compression levels are:
+//
+// * CompressNoCompression
+// * CompressBestSpeed
+// * CompressBestCompression
+// * CompressDefaultCompression
+// * CompressHuffmanOnly
+func AppendDeflateBytesLevel(dst, src []byte, level int) []byte {
+ w := &byteSliceWriter{dst}
+ WriteDeflateLevel(w, src, level)
+ return w.b
+}
+
+// WriteDeflateLevel writes deflated p to w using the given compression level
+// and returns the number of compressed bytes written to w.
+//
+// Supported compression levels are:
+//
+// * CompressNoCompression
+// * CompressBestSpeed
+// * CompressBestCompression
+// * CompressDefaultCompression
+// * CompressHuffmanOnly
+func WriteDeflateLevel(w io.Writer, p []byte, level int) (int, error) {
+ switch w.(type) {
+ case *byteSliceWriter,
+ *bytes.Buffer,
+ *ByteBuffer,
+ *bytebufferpool.ByteBuffer:
+ // These writers don't block, so we can just use stacklessWriteDeflate
+ ctx := &compressCtx{
+ w: w,
+ p: p,
+ level: level,
+ }
+ stacklessWriteDeflate(ctx)
+ return len(p), nil
+ default:
+ zw := acquireStacklessDeflateWriter(w, level)
+ n, err := zw.Write(p)
+ releaseStacklessDeflateWriter(zw, level)
+ return n, err
+ }
+}
+
+var stacklessWriteDeflate = stackless.NewFunc(nonblockingWriteDeflate)
+
+func nonblockingWriteDeflate(ctxv interface{}) {
+ ctx := ctxv.(*compressCtx)
+ zw := acquireRealDeflateWriter(ctx.w, ctx.level)
+
+ _, err := zw.Write(ctx.p)
+ if err != nil {
+ panic(fmt.Sprintf("BUG: zlib.Writer.Write for len(p)=%d returned unexpected error: %s", len(ctx.p), err))
+ }
+
+ releaseRealDeflateWriter(zw, ctx.level)
+}
+
+type compressCtx struct {
+ w io.Writer
+ p []byte
+ level int
+}
+
+// WriteDeflate writes deflated p to w and returns the number of compressed
+// bytes written to w.
+func WriteDeflate(w io.Writer, p []byte) (int, error) {
+ return WriteDeflateLevel(w, p, CompressDefaultCompression)
+}
+
+// AppendDeflateBytes appends deflated src to dst and returns the resulting dst.
+func AppendDeflateBytes(dst, src []byte) []byte {
+ return AppendDeflateBytesLevel(dst, src, CompressDefaultCompression)
+}
+
// WriteInflate writes inflated p to w and returns the number of uncompressed
// bytes written to w.
func WriteInflate(w io.Writer, p []byte) (int, error) {
@@ -192,10 +315,10 @@ func WriteInflate(w io.Writer, p []byte) (int, error) {
return nn, err
}
-// AppendGunzipBytes append gunzipped src to dst and returns the resulting dst.
-func AppendGunzipBytes(dst, src []byte) ([]byte, error) {
+// AppendInflateBytes appends inflated src to dst and returns the resulting dst.
+func AppendInflateBytes(dst, src []byte) ([]byte, error) {
w := &byteSliceWriter{dst}
- _, err := WriteGunzip(w, src)
+ _, err := WriteInflate(w, src)
return w.b, err
}
@@ -221,64 +344,79 @@ func (r *byteSliceReader) Read(p []byte) (int, error) {
return n, nil
}
-func acquireFlateWriter(w io.Writer, level int) *flateWriter {
- p := flateWriterPoolMap[level]
- if p == nil {
- panic(fmt.Sprintf("BUG: unexpected compression level passed: %d. See compress/flate for supported levels", level))
+func acquireStacklessDeflateWriter(w io.Writer, level int) stackless.Writer {
+ nLevel := normalizeCompressLevel(level)
+ p := stacklessDeflateWriterPoolMap[nLevel]
+ v := p.Get()
+ if v == nil {
+ return stackless.NewWriter(w, func(w io.Writer) stackless.Writer {
+ return acquireRealDeflateWriter(w, level)
+ })
}
+ sw := v.(stackless.Writer)
+ sw.Reset(w)
+ return sw
+}
+func releaseStacklessDeflateWriter(sw stackless.Writer, level int) {
+ sw.Close()
+ nLevel := normalizeCompressLevel(level)
+ p := stacklessDeflateWriterPoolMap[nLevel]
+ p.Put(sw)
+}
+
+func acquireRealDeflateWriter(w io.Writer, level int) *zlib.Writer {
+ nLevel := normalizeCompressLevel(level)
+ p := realDeflateWriterPoolMap[nLevel]
v := p.Get()
if v == nil {
- sw := stackless.NewWriter(w, func(w io.Writer) stackless.Writer {
- zw, err := zlib.NewWriterLevel(w, level)
- if err != nil {
- panic(fmt.Sprintf("BUG: unexpected error in zlib.NewWriterLevel(%d): %s", level, err))
- }
- return zw
- })
- return &flateWriter{
- Writer: sw,
- p: p,
+ zw, err := zlib.NewWriterLevel(w, level)
+ if err != nil {
+ panic(fmt.Sprintf("BUG: unexpected error from zlib.NewWriterLevel(%d): %s", level, err))
}
+ return zw
}
- zw := v.(*flateWriter)
+ zw := v.(*zlib.Writer)
zw.Reset(w)
return zw
}
-func releaseFlateWriter(zw *flateWriter) {
+func releaseRealDeflateWriter(zw *zlib.Writer, level int) {
zw.Close()
- zw.p.Put(zw)
+ nLevel := normalizeCompressLevel(level)
+ p := realDeflateWriterPoolMap[nLevel]
+ p.Put(zw)
}
-type flateWriter struct {
- stackless.Writer
- p *sync.Pool
-}
+var (
+ stacklessDeflateWriterPoolMap = newCompressWriterPoolMap()
+ realDeflateWriterPoolMap = newCompressWriterPoolMap()
+)
-var flateWriterPoolMap = func() map[int]*sync.Pool {
+func newCompressWriterPoolMap() []*sync.Pool {
// Initialize pools for all the compression levels defined
// in https://golang.org/pkg/compress/flate/#pkg-constants .
- m := make(map[int]*sync.Pool, 11)
- m[-1] = &sync.Pool{}
- for i := 0; i < 10; i++ {
- m[i] = &sync.Pool{}
+ // Compression levels are normalized with normalizeCompressLevel,
+ // so the fit [0..11].
+ var m []*sync.Pool
+ for i := 0; i < 12; i++ {
+ m = append(m, &sync.Pool{})
}
return m
-}()
+}
func isFileCompressible(f *os.File, minCompressRatio float64) bool {
// Try compressing the first 4kb of of the file
// and see if it can be compressed by more than
// the given minCompressRatio.
b := AcquireByteBuffer()
- zw := acquireGzipWriter(b, CompressDefaultCompression)
+ zw := acquireStacklessGzipWriter(b, CompressDefaultCompression)
lr := &io.LimitedReader{
R: f,
N: 4096,
}
_, err := copyZeroAlloc(zw, lr)
- releaseGzipWriter(zw)
+ releaseStacklessGzipWriter(zw, CompressDefaultCompression)
f.Seek(0, 0)
if err != nil {
return false
@@ -289,3 +427,14 @@ func isFileCompressible(f *os.File, minCompressRatio float64) bool {
ReleaseByteBuffer(b)
return float64(zn) < float64(n)*minCompressRatio
}
+
+// normalizes compression level into [0..11], so it could be used as an index
+// in *PoolMap.
+func normalizeCompressLevel(level int) int {
+ // -2 is the lowest compression level - CompressHuffmanOnly
+ // 9 is the highest compression level - CompressBestCompression
+ if level < -2 || level > 9 {
+ level = CompressDefaultCompression
+ }
+ return level + 2
+}