diff options
author | Aliaksandr Valialkin <valyala@gmail.com> | 2017-02-05 23:35:23 +0200 |
---|---|---|
committer | Aliaksandr Valialkin <valyala@gmail.com> | 2017-02-05 23:35:23 +0200 |
commit | 5abb44878eac507866eeaafb490b71a34738c7f7 (patch) | |
tree | 6ee727f590f22be69e676a4fff4a1b3369ab6b24 /stackless | |
parent | Added Args.GetBool helper (diff) | |
download | fasthttp-5abb44878eac507866eeaafb490b71a34738c7f7.tar.gz fasthttp-5abb44878eac507866eeaafb490b71a34738c7f7.tar.bz2 fasthttp-5abb44878eac507866eeaafb490b71a34738c7f7.zip |
stackless: added NewFunc() for wrapping stack-hungry CPU-bound functions
Diffstat (limited to 'stackless')
-rw-r--r-- | stackless/doc.go | 4 | ||||
-rw-r--r-- | stackless/func.go | 80 | ||||
-rw-r--r-- | stackless/func_test.go | 86 | ||||
-rw-r--r-- | stackless/func_timing_test.go | 40 | ||||
-rw-r--r-- | stackless/writer.go | 64 |
5 files changed, 236 insertions, 38 deletions
diff --git a/stackless/doc.go b/stackless/doc.go index ca02615..8c0cc49 100644 --- a/stackless/doc.go +++ b/stackless/doc.go @@ -1,3 +1,3 @@ -// Package stackless saves stack space for high number of concurrently -// running goroutines, which use writers from compress/* packages. +// Package stackless provides functionality that may save stack space +// for high number of concurrently running goroutines. package stackless diff --git a/stackless/func.go b/stackless/func.go new file mode 100644 index 0000000..b00a4de --- /dev/null +++ b/stackless/func.go @@ -0,0 +1,80 @@ +package stackless + +import ( + "runtime" + "sync" +) + +// NewFunc returns stackless wrapper for the function f. +// +// Unlike f, the returned stackless wrapper doesn't use stack space +// on the goroutine that calls it. +// The wrapper may save a lot of stack space if the following conditions +// are met: +// +// - f doesn't contain blocking calls on network, I/O or channels; +// - f uses a lot of stack space; +// - the wrapper is called from high number of concurrent goroutines. +// +// The stackless wrapper returns false if the call cannot be processed +// at the moment due to high load. +func NewFunc(f func(ctx interface{})) func(ctx interface{}) bool { + if f == nil { + panic("BUG: f cannot be nil") + } + return func(ctx interface{}) bool { + fw := getFuncWork() + fw.f = f + fw.ctx = ctx + + select { + case funcWorkCh <- fw: + default: + putFuncWork(fw) + return false + } + <-fw.done + putFuncWork(fw) + return true + } +} + +func init() { + n := runtime.GOMAXPROCS(-1) + for i := 0; i < n; i++ { + go funcWorker() + } +} + +func funcWorker() { + for fw := range funcWorkCh { + fw.f(fw.ctx) + fw.done <- struct{}{} + } +} + +var funcWorkCh = make(chan *funcWork, runtime.GOMAXPROCS(-1)*1024) + +func getFuncWork() *funcWork { + v := funcWorkPool.Get() + if v == nil { + v = &funcWork{ + done: make(chan struct{}), + } + } + return v.(*funcWork) +} + +func putFuncWork(fw *funcWork) { + fw.f = nil + fw.ctx = nil + funcWorkPool.Put(fw) +} + +var funcWorkPool sync.Pool + +type funcWork struct { + f func(ctx interface{}) + ctx interface{} + done chan struct{} +} diff --git a/stackless/func_test.go b/stackless/func_test.go new file mode 100644 index 0000000..9fb4588 --- /dev/null +++ b/stackless/func_test.go @@ -0,0 +1,86 @@ +package stackless + +import ( + "fmt" + "sync/atomic" + "testing" + "time" +) + +func TestNewFuncSimple(t *testing.T) { + var n uint64 + f := NewFunc(func(ctx interface{}) { + atomic.AddUint64(&n, uint64(ctx.(int))) + }) + + iterations := 2 * cap(funcWorkCh) + for i := 0; i < iterations; i++ { + if !f(2) { + t.Fatalf("f mustn't return false") + } + } + if n != uint64(2*iterations) { + t.Fatalf("Unexpected n: %d. Expecting %d", n, 2*iterations) + } +} + +func TestNewFuncMulti(t *testing.T) { + var n1, n2 uint64 + f1 := NewFunc(func(ctx interface{}) { + atomic.AddUint64(&n1, uint64(ctx.(int))) + }) + f2 := NewFunc(func(ctx interface{}) { + atomic.AddUint64(&n2, uint64(ctx.(int))) + }) + + iterations := 2 * cap(funcWorkCh) + + f1Done := make(chan error, 1) + go func() { + var err error + for i := 0; i < iterations; i++ { + if !f1(3) { + err = fmt.Errorf("f1 mustn't return false") + break + } + } + f1Done <- err + }() + + f2Done := make(chan error, 1) + go func() { + var err error + for i := 0; i < iterations; i++ { + if !f2(5) { + err = fmt.Errorf("f2 mustn't return false") + break + } + } + f2Done <- err + }() + + select { + case err := <-f1Done: + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + case <-time.After(time.Second): + t.Fatalf("timeout") + } + + select { + case err := <-f2Done: + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + case <-time.After(time.Second): + t.Fatalf("timeout") + } + + if n1 != uint64(3*iterations) { + t.Fatalf("unexpected n1: %d. Expecting %d", n1, 3*iterations) + } + if n2 != uint64(5*iterations) { + t.Fatalf("unexpected n2: %d. Expecting %d", n2, 5*iterations) + } +} diff --git a/stackless/func_timing_test.go b/stackless/func_timing_test.go new file mode 100644 index 0000000..cd4e463 --- /dev/null +++ b/stackless/func_timing_test.go @@ -0,0 +1,40 @@ +package stackless + +import ( + "sync/atomic" + "testing" +) + +func BenchmarkFuncOverhead(b *testing.B) { + var n uint64 + f := NewFunc(func(ctx interface{}) { + atomic.AddUint64(&n, *(ctx.(*uint64))) + }) + b.RunParallel(func(pb *testing.PB) { + x := uint64(1) + for pb.Next() { + if !f(&x) { + b.Fatalf("f mustn't return false") + } + } + }) + if n != uint64(b.N) { + b.Fatalf("unexected n: %d. Expecting %d", n, b.N) + } +} + +func BenchmarkFuncPure(b *testing.B) { + var n uint64 + f := func(x *uint64) { + atomic.AddUint64(&n, *x) + } + b.RunParallel(func(pb *testing.PB) { + x := uint64(1) + for pb.Next() { + f(&x) + } + }) + if n != uint64(b.N) { + b.Fatalf("unexected n: %d. Expecting %d", n, b.N) + } +} diff --git a/stackless/writer.go b/stackless/writer.go index 3910081..9b9ff09 100644 --- a/stackless/writer.go +++ b/stackless/writer.go @@ -1,10 +1,10 @@ package stackless import ( + "errors" "fmt" "github.com/valyala/bytebufferpool" "io" - "runtime" ) // Writer is an interface stackless writer must conform to. @@ -31,7 +31,6 @@ type NewWriterFunc func(w io.Writer) Writer func NewWriter(dstW io.Writer, newWriter NewWriterFunc) Writer { w := &writer{ dstW: dstW, - done: make(chan error), } w.zw = newWriter(&w.xw) return w @@ -42,8 +41,8 @@ type writer struct { zw Writer xw xWriter - done chan error - n int + err error + n int p []byte op op @@ -81,8 +80,10 @@ func (w *writer) Reset(dstW io.Writer) { func (w *writer) do(op op) error { w.op = op - writerCh <- w - err := <-w.done + if !stacklessWriterFunc(w) { + return errHighLoad + } + err := w.err if err != nil { return err } @@ -94,6 +95,27 @@ func (w *writer) do(op op) error { return err } +var errHighLoad = errors.New("cannot compress data due to high load") + +var stacklessWriterFunc = NewFunc(writerFunc) + +func writerFunc(ctx interface{}) { + w := ctx.(*writer) + switch w.op { + case opWrite: + w.n, w.err = w.zw.Write(w.p) + case opFlush: + w.err = w.zw.Flush() + case opClose: + w.err = w.zw.Close() + case opReset: + w.zw.Reset(&w.xw) + w.err = nil + default: + panic(fmt.Sprintf("BUG: unexpected op: %d", w.op)) + } +} + type xWriter struct { bb *bytebufferpool.ByteBuffer } @@ -114,33 +136,3 @@ func (w *xWriter) Reset() { } var bufferPool bytebufferpool.Pool - -func init() { - n := runtime.GOMAXPROCS(-1) - writerCh = make(chan *writer, n) - for i := 0; i < n; i++ { - go worker() - } -} - -var writerCh chan *writer - -func worker() { - var err error - for w := range writerCh { - switch w.op { - case opWrite: - w.n, err = w.zw.Write(w.p) - case opFlush: - err = w.zw.Flush() - case opClose: - err = w.zw.Close() - case opReset: - w.zw.Reset(&w.xw) - err = nil - default: - panic(fmt.Sprintf("BUG: unexpected op: %d", w.op)) - } - w.done <- err - } -} |