aboutsummaryrefslogtreecommitdiff
path: root/workerpool.go
diff options
context:
space:
mode:
authorGravatar Andy Pan <panjf2000@gmail.com> 2019-10-28 02:27:31 +0800
committerGravatar Erik Dubbelboer <erik@dubbelboer.com> 2019-10-28 02:27:31 +0800
commit9f11af296864153ee45341d3f2fe0f5178fd6210 (patch)
treedef788b7807a60bc9942d56e265dfec8b59eab67 /workerpool.go
parentRequests with incomplete bodies no longer cause log noise (#682) (diff)
downloadfasthttp-9f11af296864153ee45341d3f2fe0f5178fd6210.tar.gz
fasthttp-9f11af296864153ee45341d3f2fe0f5178fd6210.tar.bz2
fasthttp-9f11af296864153ee45341d3f2fe0f5178fd6210.zip
Make several optimizations to worker pool (#680)
* Use binary-search algorithm to speed up cleaning up workers * Speed it up when iterating the slice of workerChan * Use sync.Pool as a more canonical way * Add benchmark test between binary-search and linear search * Optimize range to the slice of workerChan, avoiding elements copy * Perfect the benchmark of work pool * Make binary-search code inline and remove benchmark test code
Diffstat (limited to 'workerpool.go')
-rw-r--r--workerpool.go53
1 files changed, 32 insertions, 21 deletions
diff --git a/workerpool.go b/workerpool.go
index 84b1464..9b1987e 100644
--- a/workerpool.go
+++ b/workerpool.go
@@ -50,6 +50,11 @@ func (wp *workerPool) Start() {
}
wp.stopCh = make(chan struct{})
stopCh := wp.stopCh
+ wp.workerChanPool.New = func() interface{} {
+ return &workerChan{
+ ch: make(chan net.Conn, workerChanCap),
+ }
+ }
go func() {
var scratch []*workerChan
for {
@@ -76,8 +81,8 @@ func (wp *workerPool) Stop() {
// serving the connection and noticing wp.mustStop = true.
wp.lock.Lock()
ready := wp.ready
- for i, ch := range ready {
- ch.ch <- nil
+ for i := range ready {
+ ready[i].ch <- nil
ready[i] = nil
}
wp.ready = ready[:0]
@@ -97,23 +102,34 @@ func (wp *workerPool) clean(scratch *[]*workerChan) {
// Clean least recently used workers if they didn't serve connections
// for more than maxIdleWorkerDuration.
- currentTime := time.Now()
+ criticalTime := time.Now().Add(-maxIdleWorkerDuration)
wp.lock.Lock()
ready := wp.ready
n := len(ready)
- i := 0
- for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
- i++
- }
- *scratch = append((*scratch)[:0], ready[:i]...)
- if i > 0 {
- m := copy(ready, ready[i:])
- for i = m; i < n; i++ {
- ready[i] = nil
+
+ // Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
+ l, r, mid := 0, n-1, 0
+ for l <= r {
+ mid = (l + r) / 2
+ if criticalTime.After(wp.ready[mid].lastUseTime) {
+ l = mid + 1
+ } else {
+ r = mid - 1
}
- wp.ready = ready[:m]
}
+ i := r
+ if i == -1 {
+ wp.lock.Unlock()
+ return
+ }
+
+ *scratch = append((*scratch)[:0], ready[:i+1]...)
+ m := copy(ready, ready[i+1:])
+ for i = m; i < n; i++ {
+ ready[i] = nil
+ }
+ wp.ready = ready[:m]
wp.lock.Unlock()
// Notify obsolete workers to stop.
@@ -121,8 +137,8 @@ func (wp *workerPool) clean(scratch *[]*workerChan) {
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
tmp := *scratch
- for i, ch := range tmp {
- ch.ch <- nil
+ for i := range tmp {
+ tmp[i].ch <- nil
tmp[i] = nil
}
}
@@ -174,11 +190,6 @@ func (wp *workerPool) getCh() *workerChan {
return nil
}
vch := wp.workerChanPool.Get()
- if vch == nil {
- vch = &workerChan{
- ch: make(chan net.Conn, workerChanCap),
- }
- }
ch = vch.(*workerChan)
go func() {
wp.workerFunc(ch)
@@ -222,7 +233,7 @@ func (wp *workerPool) workerFunc(ch *workerChan) {
if err == errHijacked {
wp.connState(c, StateHijacked)
} else {
- c.Close()
+ _ = c.Close()
wp.connState(c, StateClosed)
}
c = nil