diff options
author | Aliaksandr Valialkin <valyala@gmail.com> | 2017-02-06 14:23:36 +0200 |
---|---|---|
committer | Aliaksandr Valialkin <valyala@gmail.com> | 2017-02-06 14:23:39 +0200 |
commit | e113a6dfce3d5f50e9529b6dbb4638402bcdb164 (patch) | |
tree | 181bf048f5f22db9dacc90c037d8c17224811dba /stackless | |
parent | stackless: send "func done" notification over a buffered channel, so the func... (diff) | |
download | fasthttp-e113a6dfce3d5f50e9529b6dbb4638402bcdb164.tar.gz fasthttp-e113a6dfce3d5f50e9529b6dbb4638402bcdb164.tar.bz2 fasthttp-e113a6dfce3d5f50e9529b6dbb4638402bcdb164.zip |
stackless: use dedicated worker pool per each stackless func.
This reduces tail latencies in our prod when multiple stackless funcs
are used concurrently.
Diffstat (limited to 'stackless')
-rw-r--r-- | stackless/func.go | 27 | ||||
-rw-r--r-- | stackless/func_test.go | 4 |
2 files changed, 15 insertions, 16 deletions
diff --git a/stackless/func.go b/stackless/func.go index 4202b49..9a49bcc 100644 --- a/stackless/func.go +++ b/stackless/func.go @@ -22,9 +22,19 @@ func NewFunc(f func(ctx interface{})) func(ctx interface{}) bool { if f == nil { panic("BUG: f cannot be nil") } + + funcWorkCh := make(chan *funcWork, runtime.GOMAXPROCS(-1)*2048) + onceInit := func() { + n := runtime.GOMAXPROCS(-1) + for i := 0; i < n; i++ { + go funcWorker(funcWorkCh, f) + } + } + var once sync.Once + return func(ctx interface{}) bool { + once.Do(onceInit) fw := getFuncWork() - fw.f = f fw.ctx = ctx select { @@ -39,22 +49,13 @@ func NewFunc(f func(ctx interface{})) func(ctx interface{}) bool { } } -func init() { - n := runtime.GOMAXPROCS(-1) - for i := 0; i < n; i++ { - go funcWorker() - } -} - -func funcWorker() { +func funcWorker(funcWorkCh <-chan *funcWork, f func(ctx interface{})) { for fw := range funcWorkCh { - fw.f(fw.ctx) + f(fw.ctx) fw.done <- struct{}{} } } -var funcWorkCh = make(chan *funcWork, runtime.GOMAXPROCS(-1)*1024) - func getFuncWork() *funcWork { v := funcWorkPool.Get() if v == nil { @@ -66,7 +67,6 @@ func getFuncWork() *funcWork { } func putFuncWork(fw *funcWork) { - fw.f = nil fw.ctx = nil funcWorkPool.Put(fw) } @@ -74,7 +74,6 @@ func putFuncWork(fw *funcWork) { var funcWorkPool sync.Pool type funcWork struct { - f func(ctx interface{}) ctx interface{} done chan struct{} } diff --git a/stackless/func_test.go b/stackless/func_test.go index 9fb4588..4f2c492 100644 --- a/stackless/func_test.go +++ b/stackless/func_test.go @@ -13,7 +13,7 @@ func TestNewFuncSimple(t *testing.T) { atomic.AddUint64(&n, uint64(ctx.(int))) }) - iterations := 2 * cap(funcWorkCh) + iterations := 4 * 1024 for i := 0; i < iterations; i++ { if !f(2) { t.Fatalf("f mustn't return false") @@ -33,7 +33,7 @@ func TestNewFuncMulti(t *testing.T) { atomic.AddUint64(&n2, uint64(ctx.(int))) }) - iterations := 2 * cap(funcWorkCh) + iterations := 4 * 1024 f1Done := make(chan error, 1) go func() { |