aboutsummaryrefslogtreecommitdiff
path: root/stackless
diff options
context:
space:
mode:
authorGravatar Aliaksandr Valialkin <valyala@gmail.com> 2017-02-05 23:35:23 +0200
committerGravatar Aliaksandr Valialkin <valyala@gmail.com> 2017-02-05 23:35:23 +0200
commit5abb44878eac507866eeaafb490b71a34738c7f7 (patch)
tree6ee727f590f22be69e676a4fff4a1b3369ab6b24 /stackless
parentAdded Args.GetBool helper (diff)
downloadfasthttp-5abb44878eac507866eeaafb490b71a34738c7f7.tar.gz
fasthttp-5abb44878eac507866eeaafb490b71a34738c7f7.tar.bz2
fasthttp-5abb44878eac507866eeaafb490b71a34738c7f7.zip
stackless: added NewFunc() for wrapping stack-hungry CPU-bound functions
Diffstat (limited to 'stackless')
-rw-r--r--stackless/doc.go4
-rw-r--r--stackless/func.go80
-rw-r--r--stackless/func_test.go86
-rw-r--r--stackless/func_timing_test.go40
-rw-r--r--stackless/writer.go64
5 files changed, 236 insertions, 38 deletions
diff --git a/stackless/doc.go b/stackless/doc.go
index ca02615..8c0cc49 100644
--- a/stackless/doc.go
+++ b/stackless/doc.go
@@ -1,3 +1,3 @@
-// Package stackless saves stack space for high number of concurrently
-// running goroutines, which use writers from compress/* packages.
+// Package stackless provides functionality that may save stack space
+// for high number of concurrently running goroutines.
package stackless
diff --git a/stackless/func.go b/stackless/func.go
new file mode 100644
index 0000000..b00a4de
--- /dev/null
+++ b/stackless/func.go
@@ -0,0 +1,80 @@
+package stackless
+
+import (
+ "runtime"
+ "sync"
+)
+
+// NewFunc returns stackless wrapper for the function f.
+//
+// Unlike f, the returned stackless wrapper doesn't use stack space
+// on the goroutine that calls it.
+// The wrapper may save a lot of stack space if the following conditions
+// are met:
+//
+// - f doesn't contain blocking calls on network, I/O or channels;
+// - f uses a lot of stack space;
+// - the wrapper is called from high number of concurrent goroutines.
+//
+// The stackless wrapper returns false if the call cannot be processed
+// at the moment due to high load.
+func NewFunc(f func(ctx interface{})) func(ctx interface{}) bool {
+ if f == nil {
+ panic("BUG: f cannot be nil")
+ }
+ return func(ctx interface{}) bool {
+ fw := getFuncWork()
+ fw.f = f
+ fw.ctx = ctx
+
+ select {
+ case funcWorkCh <- fw:
+ default:
+ putFuncWork(fw)
+ return false
+ }
+ <-fw.done
+ putFuncWork(fw)
+ return true
+ }
+}
+
+func init() {
+ n := runtime.GOMAXPROCS(-1)
+ for i := 0; i < n; i++ {
+ go funcWorker()
+ }
+}
+
+func funcWorker() {
+ for fw := range funcWorkCh {
+ fw.f(fw.ctx)
+ fw.done <- struct{}{}
+ }
+}
+
+var funcWorkCh = make(chan *funcWork, runtime.GOMAXPROCS(-1)*1024)
+
+func getFuncWork() *funcWork {
+ v := funcWorkPool.Get()
+ if v == nil {
+ v = &funcWork{
+ done: make(chan struct{}),
+ }
+ }
+ return v.(*funcWork)
+}
+
+func putFuncWork(fw *funcWork) {
+ fw.f = nil
+ fw.ctx = nil
+ funcWorkPool.Put(fw)
+}
+
+var funcWorkPool sync.Pool
+
+type funcWork struct {
+ f func(ctx interface{})
+ ctx interface{}
+ done chan struct{}
+}
diff --git a/stackless/func_test.go b/stackless/func_test.go
new file mode 100644
index 0000000..9fb4588
--- /dev/null
+++ b/stackless/func_test.go
@@ -0,0 +1,86 @@
+package stackless
+
+import (
+ "fmt"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestNewFuncSimple(t *testing.T) {
+ var n uint64
+ f := NewFunc(func(ctx interface{}) {
+ atomic.AddUint64(&n, uint64(ctx.(int)))
+ })
+
+ iterations := 2 * cap(funcWorkCh)
+ for i := 0; i < iterations; i++ {
+ if !f(2) {
+ t.Fatalf("f mustn't return false")
+ }
+ }
+ if n != uint64(2*iterations) {
+ t.Fatalf("Unexpected n: %d. Expecting %d", n, 2*iterations)
+ }
+}
+
+func TestNewFuncMulti(t *testing.T) {
+ var n1, n2 uint64
+ f1 := NewFunc(func(ctx interface{}) {
+ atomic.AddUint64(&n1, uint64(ctx.(int)))
+ })
+ f2 := NewFunc(func(ctx interface{}) {
+ atomic.AddUint64(&n2, uint64(ctx.(int)))
+ })
+
+ iterations := 2 * cap(funcWorkCh)
+
+ f1Done := make(chan error, 1)
+ go func() {
+ var err error
+ for i := 0; i < iterations; i++ {
+ if !f1(3) {
+ err = fmt.Errorf("f1 mustn't return false")
+ break
+ }
+ }
+ f1Done <- err
+ }()
+
+ f2Done := make(chan error, 1)
+ go func() {
+ var err error
+ for i := 0; i < iterations; i++ {
+ if !f2(5) {
+ err = fmt.Errorf("f2 mustn't return false")
+ break
+ }
+ }
+ f2Done <- err
+ }()
+
+ select {
+ case err := <-f1Done:
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ case <-time.After(time.Second):
+ t.Fatalf("timeout")
+ }
+
+ select {
+ case err := <-f2Done:
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ case <-time.After(time.Second):
+ t.Fatalf("timeout")
+ }
+
+ if n1 != uint64(3*iterations) {
+ t.Fatalf("unexpected n1: %d. Expecting %d", n1, 3*iterations)
+ }
+ if n2 != uint64(5*iterations) {
+ t.Fatalf("unexpected n2: %d. Expecting %d", n2, 5*iterations)
+ }
+}
diff --git a/stackless/func_timing_test.go b/stackless/func_timing_test.go
new file mode 100644
index 0000000..cd4e463
--- /dev/null
+++ b/stackless/func_timing_test.go
@@ -0,0 +1,40 @@
+package stackless
+
+import (
+ "sync/atomic"
+ "testing"
+)
+
+func BenchmarkFuncOverhead(b *testing.B) {
+ var n uint64
+ f := NewFunc(func(ctx interface{}) {
+ atomic.AddUint64(&n, *(ctx.(*uint64)))
+ })
+ b.RunParallel(func(pb *testing.PB) {
+ x := uint64(1)
+ for pb.Next() {
+ if !f(&x) {
+ b.Fatalf("f mustn't return false")
+ }
+ }
+ })
+ if n != uint64(b.N) {
+ b.Fatalf("unexected n: %d. Expecting %d", n, b.N)
+ }
+}
+
+func BenchmarkFuncPure(b *testing.B) {
+ var n uint64
+ f := func(x *uint64) {
+ atomic.AddUint64(&n, *x)
+ }
+ b.RunParallel(func(pb *testing.PB) {
+ x := uint64(1)
+ for pb.Next() {
+ f(&x)
+ }
+ })
+ if n != uint64(b.N) {
+ b.Fatalf("unexected n: %d. Expecting %d", n, b.N)
+ }
+}
diff --git a/stackless/writer.go b/stackless/writer.go
index 3910081..9b9ff09 100644
--- a/stackless/writer.go
+++ b/stackless/writer.go
@@ -1,10 +1,10 @@
package stackless
import (
+ "errors"
"fmt"
"github.com/valyala/bytebufferpool"
"io"
- "runtime"
)
// Writer is an interface stackless writer must conform to.
@@ -31,7 +31,6 @@ type NewWriterFunc func(w io.Writer) Writer
func NewWriter(dstW io.Writer, newWriter NewWriterFunc) Writer {
w := &writer{
dstW: dstW,
- done: make(chan error),
}
w.zw = newWriter(&w.xw)
return w
@@ -42,8 +41,8 @@ type writer struct {
zw Writer
xw xWriter
- done chan error
- n int
+ err error
+ n int
p []byte
op op
@@ -81,8 +80,10 @@ func (w *writer) Reset(dstW io.Writer) {
func (w *writer) do(op op) error {
w.op = op
- writerCh <- w
- err := <-w.done
+ if !stacklessWriterFunc(w) {
+ return errHighLoad
+ }
+ err := w.err
if err != nil {
return err
}
@@ -94,6 +95,27 @@ func (w *writer) do(op op) error {
return err
}
+var errHighLoad = errors.New("cannot compress data due to high load")
+
+var stacklessWriterFunc = NewFunc(writerFunc)
+
+func writerFunc(ctx interface{}) {
+ w := ctx.(*writer)
+ switch w.op {
+ case opWrite:
+ w.n, w.err = w.zw.Write(w.p)
+ case opFlush:
+ w.err = w.zw.Flush()
+ case opClose:
+ w.err = w.zw.Close()
+ case opReset:
+ w.zw.Reset(&w.xw)
+ w.err = nil
+ default:
+ panic(fmt.Sprintf("BUG: unexpected op: %d", w.op))
+ }
+}
+
type xWriter struct {
bb *bytebufferpool.ByteBuffer
}
@@ -114,33 +136,3 @@ func (w *xWriter) Reset() {
}
var bufferPool bytebufferpool.Pool
-
-func init() {
- n := runtime.GOMAXPROCS(-1)
- writerCh = make(chan *writer, n)
- for i := 0; i < n; i++ {
- go worker()
- }
-}
-
-var writerCh chan *writer
-
-func worker() {
- var err error
- for w := range writerCh {
- switch w.op {
- case opWrite:
- w.n, err = w.zw.Write(w.p)
- case opFlush:
- err = w.zw.Flush()
- case opClose:
- err = w.zw.Close()
- case opReset:
- w.zw.Reset(&w.xw)
- err = nil
- default:
- panic(fmt.Sprintf("BUG: unexpected op: %d", w.op))
- }
- w.done <- err
- }
-}