aboutsummaryrefslogtreecommitdiff
path: root/stackless
diff options
context:
space:
mode:
authorGravatar Aliaksandr Valialkin <valyala@gmail.com> 2017-02-06 14:23:36 +0200
committerGravatar Aliaksandr Valialkin <valyala@gmail.com> 2017-02-06 14:23:39 +0200
commite113a6dfce3d5f50e9529b6dbb4638402bcdb164 (patch)
tree181bf048f5f22db9dacc90c037d8c17224811dba /stackless
parentstackless: send "func done" notification over a buffered channel, so the func... (diff)
downloadfasthttp-e113a6dfce3d5f50e9529b6dbb4638402bcdb164.tar.gz
fasthttp-e113a6dfce3d5f50e9529b6dbb4638402bcdb164.tar.bz2
fasthttp-e113a6dfce3d5f50e9529b6dbb4638402bcdb164.zip
stackless: use dedicated worker pool per each stackless func.
This reduces tail latencies in our prod when multiple stackless funcs are used concurrently.
Diffstat (limited to 'stackless')
-rw-r--r--stackless/func.go27
-rw-r--r--stackless/func_test.go4
2 files changed, 15 insertions, 16 deletions
diff --git a/stackless/func.go b/stackless/func.go
index 4202b49..9a49bcc 100644
--- a/stackless/func.go
+++ b/stackless/func.go
@@ -22,9 +22,19 @@ func NewFunc(f func(ctx interface{})) func(ctx interface{}) bool {
if f == nil {
panic("BUG: f cannot be nil")
}
+
+ funcWorkCh := make(chan *funcWork, runtime.GOMAXPROCS(-1)*2048)
+ onceInit := func() {
+ n := runtime.GOMAXPROCS(-1)
+ for i := 0; i < n; i++ {
+ go funcWorker(funcWorkCh, f)
+ }
+ }
+ var once sync.Once
+
return func(ctx interface{}) bool {
+ once.Do(onceInit)
fw := getFuncWork()
- fw.f = f
fw.ctx = ctx
select {
@@ -39,22 +49,13 @@ func NewFunc(f func(ctx interface{})) func(ctx interface{}) bool {
}
}
-func init() {
- n := runtime.GOMAXPROCS(-1)
- for i := 0; i < n; i++ {
- go funcWorker()
- }
-}
-
-func funcWorker() {
+func funcWorker(funcWorkCh <-chan *funcWork, f func(ctx interface{})) {
for fw := range funcWorkCh {
- fw.f(fw.ctx)
+ f(fw.ctx)
fw.done <- struct{}{}
}
}
-var funcWorkCh = make(chan *funcWork, runtime.GOMAXPROCS(-1)*1024)
-
func getFuncWork() *funcWork {
v := funcWorkPool.Get()
if v == nil {
@@ -66,7 +67,6 @@ func getFuncWork() *funcWork {
}
func putFuncWork(fw *funcWork) {
- fw.f = nil
fw.ctx = nil
funcWorkPool.Put(fw)
}
@@ -74,7 +74,6 @@ func putFuncWork(fw *funcWork) {
var funcWorkPool sync.Pool
type funcWork struct {
- f func(ctx interface{})
ctx interface{}
done chan struct{}
}
diff --git a/stackless/func_test.go b/stackless/func_test.go
index 9fb4588..4f2c492 100644
--- a/stackless/func_test.go
+++ b/stackless/func_test.go
@@ -13,7 +13,7 @@ func TestNewFuncSimple(t *testing.T) {
atomic.AddUint64(&n, uint64(ctx.(int)))
})
- iterations := 2 * cap(funcWorkCh)
+ iterations := 4 * 1024
for i := 0; i < iterations; i++ {
if !f(2) {
t.Fatalf("f mustn't return false")
@@ -33,7 +33,7 @@ func TestNewFuncMulti(t *testing.T) {
atomic.AddUint64(&n2, uint64(ctx.(int)))
})
- iterations := 2 * cap(funcWorkCh)
+ iterations := 4 * 1024
f1Done := make(chan error, 1)
go func() {