aboutsummaryrefslogtreecommitdiff
path: root/compress_test.go
diff options
context:
space:
mode:
authorGravatar Aliaksandr Valialkin <valyala@gmail.com> 2017-05-17 14:35:37 +0300
committerGravatar Aliaksandr Valialkin <valyala@gmail.com> 2017-05-17 14:45:31 +0300
commit30e92af08fad62919ff2eae84f530a2f52a547c3 (patch)
tree5a565c9ac513e70971d495f5a32d1e3ac36bf91f /compress_test.go
parentclient: properly extract tls ServerName from address without port (diff)
downloadfasthttp-30e92af08fad62919ff2eae84f530a2f52a547c3.tar.gz
fasthttp-30e92af08fad62919ff2eae84f530a2f52a547c3.tar.bz2
fasthttp-30e92af08fad62919ff2eae84f530a2f52a547c3.zip
Limit heap memory usage when compressing high number of concurrent responses
Previously each concurrent compression could allocate huge compression state with the size up to 1Mb each. So 10K concurrent connections could result in 10Gb of compression state in the heap. This CL limits the number of compression states among concurrent requests when {Append,Write}{Gzip,Deflate}* functions are called to O(GOMAXPROCS). These functions are used by CompressHandler* for non-streaming responses, i.e. it should cover the majority of use cases. Memory usage for 10K concurrent connections that compress responses drops from 10Gb to 200Mb after this CL.
Diffstat (limited to 'compress_test.go')
-rw-r--r--compress_test.go189
1 files changed, 158 insertions, 31 deletions
diff --git a/compress_test.go b/compress_test.go
index 6a61897..f0cdaff 100644
--- a/compress_test.go
+++ b/compress_test.go
@@ -2,88 +2,215 @@ package fasthttp
import (
"bytes"
+ "fmt"
"io/ioutil"
"testing"
+ "time"
)
-func TestGzipBytes(t *testing.T) {
- testGzipBytes(t, "")
- testGzipBytes(t, "foobar")
- testGzipBytes(t, "выфаодлодл одлфываыв sd2 k34")
+var compressTestcases = func() []string {
+ a := []string{
+ "",
+ "foobar",
+ "выфаодлодл одлфываыв sd2 k34",
+ }
+ bigS := createFixedBody(1e4)
+ a = append(a, string(bigS))
+ return a
+}()
+
+func TestGzipBytesSerial(t *testing.T) {
+ if err := testGzipBytes(); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestGzipBytesConcurrent(t *testing.T) {
+ if err := testConcurrent(10, testGzipBytes); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestDeflateBytesSerial(t *testing.T) {
+ if err := testDeflateBytes(); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestDeflateBytesConcurrent(t *testing.T) {
+ if err := testConcurrent(10, testDeflateBytes); err != nil {
+ t.Fatal(err)
+ }
}
-func testGzipBytes(t *testing.T, s string) {
+func testGzipBytes() error {
+ for _, s := range compressTestcases {
+ if err := testGzipBytesSingleCase(s); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func testDeflateBytes() error {
+ for _, s := range compressTestcases {
+ if err := testDeflateBytesSingleCase(s); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func testGzipBytesSingleCase(s string) error {
prefix := []byte("foobar")
gzippedS := AppendGzipBytes(prefix, []byte(s))
if !bytes.Equal(gzippedS[:len(prefix)], prefix) {
- t.Fatalf("unexpected prefix when compressing %q: %q. Expecting %q", s, gzippedS[:len(prefix)], prefix)
+ return fmt.Errorf("unexpected prefix when compressing %q: %q. Expecting %q", s, gzippedS[:len(prefix)], prefix)
}
gunzippedS, err := AppendGunzipBytes(prefix, gzippedS[len(prefix):])
if err != nil {
- t.Fatalf("unexpected error when uncompressing %q: %s", s, err)
+ return fmt.Errorf("unexpected error when uncompressing %q: %s", s, err)
}
if !bytes.Equal(gunzippedS[:len(prefix)], prefix) {
- t.Fatalf("unexpected prefix when uncompressing %q: %q. Expecting %q", s, gunzippedS[:len(prefix)], prefix)
+ return fmt.Errorf("unexpected prefix when uncompressing %q: %q. Expecting %q", s, gunzippedS[:len(prefix)], prefix)
}
gunzippedS = gunzippedS[len(prefix):]
if string(gunzippedS) != s {
- t.Fatalf("unexpected uncompressed string %q. Expecting %q", gunzippedS, s)
+ return fmt.Errorf("unexpected uncompressed string %q. Expecting %q", gunzippedS, s)
}
+ return nil
}
-func TestGzipCompress(t *testing.T) {
- testGzipCompress(t, "")
- testGzipCompress(t, "foobar")
- testGzipCompress(t, "ajjnkn asdlkjfqoijfw jfqkwj foj eowjiq")
+func testDeflateBytesSingleCase(s string) error {
+ prefix := []byte("foobar")
+ deflatedS := AppendDeflateBytes(prefix, []byte(s))
+ if !bytes.Equal(deflatedS[:len(prefix)], prefix) {
+ return fmt.Errorf("unexpected prefix when compressing %q: %q. Expecting %q", s, deflatedS[:len(prefix)], prefix)
+ }
+
+ inflatedS, err := AppendInflateBytes(prefix, deflatedS[len(prefix):])
+ if err != nil {
+ return fmt.Errorf("unexpected error when uncompressing %q: %s", s, err)
+ }
+ if !bytes.Equal(inflatedS[:len(prefix)], prefix) {
+ return fmt.Errorf("unexpected prefix when uncompressing %q: %q. Expecting %q", s, inflatedS[:len(prefix)], prefix)
+ }
+ inflatedS = inflatedS[len(prefix):]
+ if string(inflatedS) != s {
+ return fmt.Errorf("unexpected uncompressed string %q. Expecting %q", inflatedS, s)
+ }
+ return nil
+}
+
+func TestGzipCompressSerial(t *testing.T) {
+ if err := testGzipCompress(); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestGzipCompressConcurrent(t *testing.T) {
+ if err := testConcurrent(10, testGzipCompress); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestFlateCompressSerial(t *testing.T) {
+ if err := testFlateCompress(); err != nil {
+ t.Fatal(err)
+ }
}
-func TestFlateCompress(t *testing.T) {
- testFlateCompress(t, "")
- testFlateCompress(t, "foobar")
- testFlateCompress(t, "adf asd asd fasd fasd")
+func TestFlateCompressConcurrent(t *testing.T) {
+ if err := testConcurrent(10, testFlateCompress); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testGzipCompress() error {
+ for _, s := range compressTestcases {
+ if err := testGzipCompressSingleCase(s); err != nil {
+ return err
+ }
+ }
+ return nil
}
-func testGzipCompress(t *testing.T, s string) {
+func testFlateCompress() error {
+ for _, s := range compressTestcases {
+ if err := testFlateCompressSingleCase(s); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func testGzipCompressSingleCase(s string) error {
var buf bytes.Buffer
- zw := acquireGzipWriter(&buf, CompressDefaultCompression)
+ zw := acquireStacklessGzipWriter(&buf, CompressDefaultCompression)
if _, err := zw.Write([]byte(s)); err != nil {
- t.Fatalf("unexpected error: %s. s=%q", err, s)
+ return fmt.Errorf("unexpected error: %s. s=%q", err, s)
}
- releaseGzipWriter(zw)
+ releaseStacklessGzipWriter(zw, CompressDefaultCompression)
zr, err := acquireGzipReader(&buf)
if err != nil {
- t.Fatalf("unexpected error: %s. s=%q", err, s)
+ return fmt.Errorf("unexpected error: %s. s=%q", err, s)
}
body, err := ioutil.ReadAll(zr)
if err != nil {
- t.Fatalf("unexpected error: %s. s=%q", err, s)
+ return fmt.Errorf("unexpected error: %s. s=%q", err, s)
}
if string(body) != s {
- t.Fatalf("unexpected string after decompression: %q. Expecting %q", body, s)
+ return fmt.Errorf("unexpected string after decompression: %q. Expecting %q", body, s)
}
releaseGzipReader(zr)
+ return nil
}
-func testFlateCompress(t *testing.T, s string) {
+func testFlateCompressSingleCase(s string) error {
var buf bytes.Buffer
- zw := acquireFlateWriter(&buf, CompressDefaultCompression)
+ zw := acquireStacklessDeflateWriter(&buf, CompressDefaultCompression)
if _, err := zw.Write([]byte(s)); err != nil {
- t.Fatalf("unexpected error: %s. s=%q", err, s)
+ return fmt.Errorf("unexpected error: %s. s=%q", err, s)
}
- releaseFlateWriter(zw)
+ releaseStacklessDeflateWriter(zw, CompressDefaultCompression)
zr, err := acquireFlateReader(&buf)
if err != nil {
- t.Fatalf("unexpected error: %s. s=%q", err, s)
+ return fmt.Errorf("unexpected error: %s. s=%q", err, s)
}
body, err := ioutil.ReadAll(zr)
if err != nil {
- t.Fatalf("unexpected error: %s. s=%q", err, s)
+ return fmt.Errorf("unexpected error: %s. s=%q", err, s)
}
if string(body) != s {
- t.Fatalf("unexpected string after decompression: %q. Expecting %q", body, s)
+ return fmt.Errorf("unexpected string after decompression: %q. Expecting %q", body, s)
}
releaseFlateReader(zr)
+ return nil
+}
+
+func testConcurrent(concurrency int, f func() error) error {
+ ch := make(chan error, concurrency)
+ for i := 0; i < concurrency; i++ {
+ go func(idx int) {
+ err := f()
+ if err != nil {
+ ch <- fmt.Errorf("error in goroutine %d: %s", idx, err)
+ }
+ ch <- nil
+ }(i)
+ }
+ for i := 0; i < concurrency; i++ {
+ select {
+ case err := <-ch:
+ if err != nil {
+ return err
+ }
+ case <-time.After(time.Second):
+ return fmt.Errorf("timeout")
+ }
+ }
+ return nil
}