diff options
author | Andy Pan <panjf2000@gmail.com> | 2019-10-28 02:27:31 +0800 |
---|---|---|
committer | Erik Dubbelboer <erik@dubbelboer.com> | 2019-10-28 02:27:31 +0800 |
commit | 9f11af296864153ee45341d3f2fe0f5178fd6210 (patch) | |
tree | def788b7807a60bc9942d56e265dfec8b59eab67 /workerpool.go | |
parent | Requests with incomplete bodies no longer cause log noise (#682) (diff) | |
download | fasthttp-9f11af296864153ee45341d3f2fe0f5178fd6210.tar.gz fasthttp-9f11af296864153ee45341d3f2fe0f5178fd6210.tar.bz2 fasthttp-9f11af296864153ee45341d3f2fe0f5178fd6210.zip |
Make several optimizations to worker pool (#680)
* Use binary-search algorithm to speed up cleaning up workers
* Speed it up when iterating the slice of workerChan
* Use sync.Pool as a more canonical way
* Add benchmark test between binary-search and linear search
* Optimize range to the slice of workerChan, avoiding elements copy
* Perfect the benchmark of work pool
* Make binary-search code inline and remove benchmark test code
Diffstat (limited to 'workerpool.go')
-rw-r--r-- | workerpool.go | 53 |
1 files changed, 32 insertions, 21 deletions
diff --git a/workerpool.go b/workerpool.go index 84b1464..9b1987e 100644 --- a/workerpool.go +++ b/workerpool.go @@ -50,6 +50,11 @@ func (wp *workerPool) Start() { } wp.stopCh = make(chan struct{}) stopCh := wp.stopCh + wp.workerChanPool.New = func() interface{} { + return &workerChan{ + ch: make(chan net.Conn, workerChanCap), + } + } go func() { var scratch []*workerChan for { @@ -76,8 +81,8 @@ func (wp *workerPool) Stop() { // serving the connection and noticing wp.mustStop = true. wp.lock.Lock() ready := wp.ready - for i, ch := range ready { - ch.ch <- nil + for i := range ready { + ready[i].ch <- nil ready[i] = nil } wp.ready = ready[:0] @@ -97,23 +102,34 @@ func (wp *workerPool) clean(scratch *[]*workerChan) { // Clean least recently used workers if they didn't serve connections // for more than maxIdleWorkerDuration. - currentTime := time.Now() + criticalTime := time.Now().Add(-maxIdleWorkerDuration) wp.lock.Lock() ready := wp.ready n := len(ready) - i := 0 - for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration { - i++ - } - *scratch = append((*scratch)[:0], ready[:i]...) - if i > 0 { - m := copy(ready, ready[i:]) - for i = m; i < n; i++ { - ready[i] = nil + + // Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up. + l, r, mid := 0, n-1, 0 + for l <= r { + mid = (l + r) / 2 + if criticalTime.After(wp.ready[mid].lastUseTime) { + l = mid + 1 + } else { + r = mid - 1 } - wp.ready = ready[:m] } + i := r + if i == -1 { + wp.lock.Unlock() + return + } + + *scratch = append((*scratch)[:0], ready[:i+1]...) + m := copy(ready, ready[i+1:]) + for i = m; i < n; i++ { + ready[i] = nil + } + wp.ready = ready[:m] wp.lock.Unlock() // Notify obsolete workers to stop. @@ -121,8 +137,8 @@ func (wp *workerPool) clean(scratch *[]*workerChan) { // may be blocking and may consume a lot of time if many workers // are located on non-local CPUs. tmp := *scratch - for i, ch := range tmp { - ch.ch <- nil + for i := range tmp { + tmp[i].ch <- nil tmp[i] = nil } } @@ -174,11 +190,6 @@ func (wp *workerPool) getCh() *workerChan { return nil } vch := wp.workerChanPool.Get() - if vch == nil { - vch = &workerChan{ - ch: make(chan net.Conn, workerChanCap), - } - } ch = vch.(*workerChan) go func() { wp.workerFunc(ch) @@ -222,7 +233,7 @@ func (wp *workerPool) workerFunc(ch *workerChan) { if err == errHijacked { wp.connState(c, StateHijacked) } else { - c.Close() + _ = c.Close() wp.connState(c, StateClosed) } c = nil |