aboutsummaryrefslogtreecommitdiff
path: root/stackless
diff options
context:
space:
mode:
authorGravatar Aliaksandr Valialkin <valyala@gmail.com> 2016-11-03 19:38:38 +0200
committerGravatar Aliaksandr Valialkin <valyala@gmail.com> 2016-11-03 19:38:42 +0200
commitf49c6b3f963bd9c86da5df1399ee694419a6fe97 (patch)
tree8a38c806e10ae31d4402032e1235b08980b47eb3 /stackless
parentIssue #193: properly handle nil body when calling WriteGzip* and WriteDeflate... (diff)
downloadfasthttp-f49c6b3f963bd9c86da5df1399ee694419a6fe97.tar.gz
fasthttp-f49c6b3f963bd9c86da5df1399ee694419a6fe97.tar.bz2
fasthttp-f49c6b3f963bd9c86da5df1399ee694419a6fe97.zip
Reduce stack space usage when using response compression
Diffstat (limited to 'stackless')
-rw-r--r--stackless/doc.go3
-rw-r--r--stackless/writer.go146
-rw-r--r--stackless/writer_test.go122
3 files changed, 271 insertions, 0 deletions
diff --git a/stackless/doc.go b/stackless/doc.go
new file mode 100644
index 0000000..37591dd
--- /dev/null
+++ b/stackless/doc.go
@@ -0,0 +1,3 @@
+// Package stackless saves stack space when using writers from compress/*
+// packages.
+package stackless
diff --git a/stackless/writer.go b/stackless/writer.go
new file mode 100644
index 0000000..18c3e62
--- /dev/null
+++ b/stackless/writer.go
@@ -0,0 +1,146 @@
+package stackless
+
+import (
+ "fmt"
+ "github.com/valyala/bytebufferpool"
+ "io"
+ "runtime"
+)
+
+// Writer is an interface stackless writer must conform to.
+//
+// The interface contains common subset for Writers from compress/* packages.
+type Writer interface {
+ Write(p []byte) (int, error)
+ Flush() error
+ Close() error
+ Reset(w io.Writer)
+}
+
+// NewWriterFunc must return new writer that will be wrapped into
+// stackless writer.
+type NewWriterFunc func(w io.Writer) Writer
+
+// NewWriter creates a stackless writer around a writer returned
+// from newWriter.
+//
+// The returned writer writes data to dstW.
+//
+// Writers that use a lot of stack space may be wrapped into stackless writer,
+// thus saving stack space.
+func NewWriter(dstW io.Writer, newWriter NewWriterFunc) Writer {
+ w := &writer{
+ dstW: dstW,
+ done: make(chan error),
+ }
+ w.zw = newWriter(&w.xw)
+ return w
+}
+
+type writer struct {
+ dstW io.Writer
+ zw Writer
+ xw xWriter
+
+ done chan error
+ n int
+
+ p []byte
+ op op
+}
+
+type op int
+
+const (
+ opWrite op = iota
+ opFlush
+ opClose
+ opReset
+)
+
+func (w *writer) Write(p []byte) (int, error) {
+ w.p = p
+ err := w.do(opWrite)
+ w.p = nil
+ return w.n, err
+}
+
+func (w *writer) Flush() error {
+ return w.do(opFlush)
+}
+
+func (w *writer) Close() error {
+ return w.do(opClose)
+}
+
+func (w *writer) Reset(dstW io.Writer) {
+ w.xw.Reset()
+ w.do(opReset)
+ w.dstW = dstW
+}
+
+func (w *writer) do(op op) error {
+ w.op = op
+ writerCh <- w
+ err := <-w.done
+ if err != nil {
+ return err
+ }
+ if w.xw.bb != nil {
+ _, err = w.dstW.Write(w.xw.bb.B)
+ }
+ w.xw.Reset()
+
+ return err
+}
+
+type xWriter struct {
+ bb *bytebufferpool.ByteBuffer
+}
+
+func (w *xWriter) Write(p []byte) (int, error) {
+ if w.bb == nil {
+ w.bb = bufferPool.Get()
+ }
+ w.bb.Write(p)
+ return len(p), nil
+}
+
+func (w *xWriter) Reset() {
+ if w.bb != nil {
+ bufferPool.Put(w.bb)
+ w.bb = nil
+ }
+}
+
+var bufferPool bytebufferpool.Pool
+
+func init() {
+ n := runtime.GOMAXPROCS(-1)
+ writerCh = make(chan *writer, n)
+ for i := 0; i < n; i++ {
+ go worker()
+ }
+}
+
+var writerCh chan *writer
+
+func worker() {
+ var err error
+ for w := range writerCh {
+ switch w.op {
+ case opWrite:
+ w.n, err = w.zw.Write(w.p)
+ case opFlush:
+ err = w.zw.Flush()
+ case opClose:
+ err = w.zw.Close()
+ case opReset:
+ w.zw.Reset(&w.xw)
+ err = nil
+ default:
+ panic(fmt.Sprintf("BUG: unexpected op: %d", w.op))
+ }
+ w.done <- err
+ }
+}
diff --git a/stackless/writer_test.go b/stackless/writer_test.go
new file mode 100644
index 0000000..f36f18d
--- /dev/null
+++ b/stackless/writer_test.go
@@ -0,0 +1,122 @@
+package stackless
+
+import (
+ "bytes"
+ "compress/flate"
+ "compress/gzip"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "testing"
+ "time"
+)
+
+func TestCompressFlateSerial(t *testing.T) {
+ if err := testCompressFlate(); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+}
+
+func TestCompressFlateConcurrent(t *testing.T) {
+ if err := testConcurrent(testCompressFlate, 10); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+}
+
+func testCompressFlate() error {
+ return testWriter(func(w io.Writer) Writer {
+ zw, err := flate.NewWriter(w, flate.DefaultCompression)
+ if err != nil {
+ panic(fmt.Sprintf("BUG: unexpected error: %s", err))
+ }
+ return zw
+ }, func(r io.Reader) io.Reader {
+ return flate.NewReader(r)
+ })
+}
+
+func TestCompressGzipSerial(t *testing.T) {
+ if err := testCompressGzip(); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+}
+
+func TestCompressGzipConcurrent(t *testing.T) {
+ if err := testConcurrent(testCompressGzip, 10); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+}
+
+func testCompressGzip() error {
+ return testWriter(func(w io.Writer) Writer {
+ return gzip.NewWriter(w)
+ }, func(r io.Reader) io.Reader {
+ zr, err := gzip.NewReader(r)
+ if err != nil {
+ panic(fmt.Sprintf("BUG: cannot create gzip reader: %s", err))
+ }
+ return zr
+ })
+}
+
+func testWriter(newWriter NewWriterFunc, newReader func(io.Reader) io.Reader) error {
+ dstW := &bytes.Buffer{}
+ w := NewWriter(dstW, newWriter)
+
+ for i := 0; i < 5; i++ {
+ if err := testWriterReuse(w, dstW, newReader); err != nil {
+ return fmt.Errorf("unepxected error when re-using writer on iteration %d: %s", i, err)
+ }
+ dstW = &bytes.Buffer{}
+ w.Reset(dstW)
+ }
+
+ return nil
+}
+
+func testWriterReuse(w Writer, r io.Reader, newReader func(io.Reader) io.Reader) error {
+ wantW := &bytes.Buffer{}
+ mw := io.MultiWriter(w, wantW)
+ for i := 0; i < 30; i++ {
+ fmt.Fprintf(mw, "foobar %d\n", i)
+ if i%13 == 0 {
+ if err := w.Flush(); err != nil {
+ return fmt.Errorf("error on flush: %s", err)
+ }
+ }
+ }
+ w.Close()
+
+ zr := newReader(r)
+ data, err := ioutil.ReadAll(zr)
+ if err != nil {
+ return fmt.Errorf("unexpected error: %s, data=%q", err, data)
+ }
+
+ wantData := wantW.Bytes()
+ if !bytes.Equal(data, wantData) {
+ return fmt.Errorf("unexpected data: %q. Expecting %q", data, wantData)
+ }
+
+ return nil
+}
+
+func testConcurrent(testFunc func() error, concurrency int) error {
+ ch := make(chan error, concurrency)
+ for i := 0; i < concurrency; i++ {
+ go func() {
+ ch <- testFunc()
+ }()
+ }
+ for i := 0; i < concurrency; i++ {
+ select {
+ case err := <-ch:
+ if err != nil {
+ return fmt.Errorf("unexpected error on goroutine %d: %s", i, err)
+ }
+ case <-time.After(time.Second):
+ return fmt.Errorf("timeout on goroutine %d", i)
+ }
+ }
+ return nil
+}