aboutsummaryrefslogtreecommitdiff
path: root/lbclient.go
diff options
context:
space:
mode:
authorGravatar Erik Dubbelboer <erik@dubbelboer.com> 2019-10-20 07:09:42 +0200
committerGravatar GitHub <noreply@github.com> 2019-10-20 07:09:42 +0200
commitbf98e3b75b59eaf04ead78c623e81fa0efc95809 (patch)
tree10187d25905cb445d2b90a3f18228e87d0dca1c3 /lbclient.go
parentDon't run TestClientNilResp in parallel (diff)
downloadfasthttp-bf98e3b75b59eaf04ead78c623e81fa0efc95809.tar.gz
fasthttp-bf98e3b75b59eaf04ead78c623e81fa0efc95809.tar.bz2
fasthttp-bf98e3b75b59eaf04ead78c623e81fa0efc95809.zip
Use least total connections instead of round robin for lbclient (#673)
Diffstat (limited to 'lbclient.go')
-rw-r--r--lbclient.go42
1 files changed, 12 insertions, 30 deletions
diff --git a/lbclient.go b/lbclient.go
index 12418b6..932ce97 100644
--- a/lbclient.go
+++ b/lbclient.go
@@ -17,7 +17,7 @@ type BalancingClient interface {
//
// It has the following features:
//
-// - Balances load among available clients using 'least loaded' + 'round robin'
+// - Balances load among available clients using 'least loaded' + 'least total'
// hybrid technique.
// - Dynamically decreases load on unhealthy clients.
//
@@ -49,10 +49,6 @@ type LBClient struct {
cs []*lbClient
- // nextIdx is for spreading requests among equally loaded clients
- // in a round-robin fashion.
- nextIdx uint32
-
once sync.Once
}
@@ -93,42 +89,23 @@ func (cc *LBClient) init() {
healthCheck: cc.HealthCheck,
})
}
-
- // Randomize nextIdx in order to prevent initial servers'
- // hammering from a cluster of identical LBClients.
- cc.nextIdx = uint32(time.Now().UnixNano())
}
func (cc *LBClient) get() *lbClient {
cc.once.Do(cc.init)
cs := cc.cs
- idx := atomic.AddUint32(&cc.nextIdx, 1)
- idx %= uint32(len(cs))
- minC := cs[idx]
+ minC := cs[0]
minN := minC.PendingRequests()
- if minN == 0 {
- return minC
- }
- for _, c := range cs[idx+1:] {
+ minT := atomic.LoadUint64(&minC.total)
+ for _, c := range cs[1:] {
n := c.PendingRequests()
- if n == 0 {
- return c
- }
- if n < minN {
- minC = c
- minN = n
- }
- }
- for _, c := range cs[:idx] {
- n := c.PendingRequests()
- if n == 0 {
- return c
- }
- if n < minN {
+ t := atomic.LoadUint64(&c.total)
+ if n < minN || (n == minN && t < minT) {
minC = c
minN = n
+ minT = t
}
}
return minC
@@ -138,6 +115,9 @@ type lbClient struct {
c BalancingClient
healthCheck func(req *Request, resp *Response, err error) bool
penalty uint32
+
+ // total amount of requests handled.
+ total uint64
}
func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
@@ -146,6 +126,8 @@ func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time)
// Penalize the client returning error, so the next requests
// are routed to another clients.
time.AfterFunc(penaltyDuration, c.decPenalty)
+ } else {
+ atomic.AddUint64(&c.total, 1)
}
return err
}