diff options
author | Aliaksandr Valialkin <valyala@gmail.com> | 2016-10-20 13:48:44 +0300 |
---|---|---|
committer | Aliaksandr Valialkin <valyala@gmail.com> | 2016-10-20 13:49:00 +0300 |
commit | b5d497902b4fc3e2233d3f745275f8dbd2483b44 (patch) | |
tree | 6fe8ef730f0a158c10b22ebc8dea14a76b9c7866 /lbclient.go | |
parent | Unshadow err in client test (diff) | |
download | fasthttp-b5d497902b4fc3e2233d3f745275f8dbd2483b44.tar.gz fasthttp-b5d497902b4fc3e2233d3f745275f8dbd2483b44.tar.bz2 fasthttp-b5d497902b4fc3e2233d3f745275f8dbd2483b44.zip |
Added LBClient for balancing load among multiple clients
This will be used by httptp from https://github.com/valyala/httpteleport .
Diffstat (limited to 'lbclient.go')
-rw-r--r-- | lbclient.go | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/lbclient.go b/lbclient.go new file mode 100644 index 0000000..ea81205 --- /dev/null +++ b/lbclient.go @@ -0,0 +1,174 @@ +package fasthttp + +import ( + "sync" + "sync/atomic" + "time" +) + +// BalancingClient is the interface for clients, which may be passed +// to LBClient.Clients. +type BalancingClient interface { + DoDeadline(req *Request, resp *Response, deadline time.Time) error + PendingRequests() int +} + +// LBClient balances requests among available LBClient.Clients. +// +// It has the following features: +// +// - Balance load among available clients using 'least loaded' + 'round robin' +// hybrid technique. +// - Decrease load on unhealthy clients. +// +// LBClient methods are safe for calling from concurrently running goroutines. +type LBClient struct { + noCopy noCopy + + // Clients must contain non-zero clients list. + // Incoming requests are balanced among these clients. + Clients []BalancingClient + + // HealthCheck is a callback called after each request. + // + // The request, response and the error returned by the client + // is passed to HealthCheck, so the callback may determine whether + // the client is healthy. + // + // Load on the current client is decreased if HealthCheck returns false. + // + // By default HealthCheck returns false if err != nil. + HealthCheck func(req *Request, resp *Response, err error) bool + + // Timeout is the request timeout used when calling LBClient.Do. + // + // DefaultLBClientTimeout is used by default. + Timeout time.Duration + + cs []*lbClient + + // nextIdx is for spreading requests among equally loaded clients + // in a round-robin fashion. + nextIdx uint32 + + once sync.Once +} + +// DefaultLBClientTimeout is the default request timeout used by LBClient +// when calling LBClient.Do. +// +// The timeout may be overriden via LBClient.Timeout. +const DefaultLBClientTimeout = time.Second + +// DoDeadline calls DoDeadline on the least loaded client +func (cc *LBClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error { + return cc.get().DoDeadline(req, resp, deadline) +} + +// DoTimeout calculates deadline and calls DoDeadline on the least loaded client +func (cc *LBClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + return cc.get().DoDeadline(req, resp, deadline) +} + +// Do calls calculates deadline using LBClient.Timeout and calls DoDeadline +// on the least loaded client. +func (cc *LBClient) Do(req *Request, resp *Response) error { + timeout := cc.Timeout + if timeout <= 0 { + timeout = DefaultLBClientTimeout + } + return cc.DoTimeout(req, resp, timeout) +} + +func (cc *LBClient) init() { + for _, c := range cc.Clients { + cc.cs = append(cc.cs, &lbClient{ + c: c, + healthCheck: cc.HealthCheck, + }) + } +} + +func (cc *LBClient) get() *lbClient { + cc.once.Do(cc.init) + + cs := cc.cs + idx := atomic.AddUint32(&cc.nextIdx, 1) + idx %= uint32(len(cs)) + + minC := cs[idx] + minN := minC.PendingRequests() + if minN == 0 { + return minC + } + for _, c := range cs[idx+1:] { + n := c.PendingRequests() + if n == 0 { + return c + } + if n < minN { + minC = c + minN = n + } + } + for _, c := range cs[:idx] { + n := c.PendingRequests() + if n == 0 { + return c + } + if n < minN { + minC = c + minN = n + } + } + return minC +} + +type lbClient struct { + c BalancingClient + healthCheck func(req *Request, resp *Response, err error) bool + penalty uint32 +} + +func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error { + err := c.c.DoDeadline(req, resp, deadline) + if !c.isHealthy(req, resp, err) && c.incPenalty() { + // Penalize the client returning error, so the next requests + // are routed to another clients. + time.AfterFunc(penaltyDuration, c.decPenalty) + } + return err +} + +func (c *lbClient) PendingRequests() int { + n := c.c.PendingRequests() + m := atomic.LoadUint32(&c.penalty) + return n + int(m) +} + +func (c *lbClient) isHealthy(req *Request, resp *Response, err error) bool { + if c.healthCheck == nil { + return err == nil + } + return c.healthCheck(req, resp, err) +} + +func (c *lbClient) incPenalty() bool { + m := atomic.AddUint32(&c.penalty, 1) + if m > maxPenalty { + c.decPenalty() + return false + } + return true +} + +func (c *lbClient) decPenalty() { + atomic.AddUint32(&c.penalty, ^uint32(0)) +} + +const ( + maxPenalty = 300 + + penaltyDuration = 3 * time.Second +) |