aboutsummaryrefslogtreecommitdiff
path: root/lbclient.go
diff options
context:
space:
mode:
authorGravatar Aliaksandr Valialkin <valyala@gmail.com> 2016-10-20 13:48:44 +0300
committerGravatar Aliaksandr Valialkin <valyala@gmail.com> 2016-10-20 13:49:00 +0300
commitb5d497902b4fc3e2233d3f745275f8dbd2483b44 (patch)
tree6fe8ef730f0a158c10b22ebc8dea14a76b9c7866 /lbclient.go
parentUnshadow err in client test (diff)
downloadfasthttp-b5d497902b4fc3e2233d3f745275f8dbd2483b44.tar.gz
fasthttp-b5d497902b4fc3e2233d3f745275f8dbd2483b44.tar.bz2
fasthttp-b5d497902b4fc3e2233d3f745275f8dbd2483b44.zip
Added LBClient for balancing load among multiple clients
This will be used by httptp from https://github.com/valyala/httpteleport .
Diffstat (limited to 'lbclient.go')
-rw-r--r--lbclient.go174
1 files changed, 174 insertions, 0 deletions
diff --git a/lbclient.go b/lbclient.go
new file mode 100644
index 0000000..ea81205
--- /dev/null
+++ b/lbclient.go
@@ -0,0 +1,174 @@
+package fasthttp
+
+import (
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// BalancingClient is the interface for clients, which may be passed
+// to LBClient.Clients.
+type BalancingClient interface {
+ DoDeadline(req *Request, resp *Response, deadline time.Time) error
+ PendingRequests() int
+}
+
+// LBClient balances requests among available LBClient.Clients.
+//
+// It has the following features:
+//
+// - Balance load among available clients using 'least loaded' + 'round robin'
+// hybrid technique.
+// - Decrease load on unhealthy clients.
+//
+// LBClient methods are safe for calling from concurrently running goroutines.
+type LBClient struct {
+ noCopy noCopy
+
+ // Clients must contain non-zero clients list.
+ // Incoming requests are balanced among these clients.
+ Clients []BalancingClient
+
+ // HealthCheck is a callback called after each request.
+ //
+ // The request, response and the error returned by the client
+ // is passed to HealthCheck, so the callback may determine whether
+ // the client is healthy.
+ //
+ // Load on the current client is decreased if HealthCheck returns false.
+ //
+ // By default HealthCheck returns false if err != nil.
+ HealthCheck func(req *Request, resp *Response, err error) bool
+
+ // Timeout is the request timeout used when calling LBClient.Do.
+ //
+ // DefaultLBClientTimeout is used by default.
+ Timeout time.Duration
+
+ cs []*lbClient
+
+ // nextIdx is for spreading requests among equally loaded clients
+ // in a round-robin fashion.
+ nextIdx uint32
+
+ once sync.Once
+}
+
+// DefaultLBClientTimeout is the default request timeout used by LBClient
+// when calling LBClient.Do.
+//
+// The timeout may be overriden via LBClient.Timeout.
+const DefaultLBClientTimeout = time.Second
+
+// DoDeadline calls DoDeadline on the least loaded client
+func (cc *LBClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
+ return cc.get().DoDeadline(req, resp, deadline)
+}
+
+// DoTimeout calculates deadline and calls DoDeadline on the least loaded client
+func (cc *LBClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
+ deadline := time.Now().Add(timeout)
+ return cc.get().DoDeadline(req, resp, deadline)
+}
+
+// Do calls calculates deadline using LBClient.Timeout and calls DoDeadline
+// on the least loaded client.
+func (cc *LBClient) Do(req *Request, resp *Response) error {
+ timeout := cc.Timeout
+ if timeout <= 0 {
+ timeout = DefaultLBClientTimeout
+ }
+ return cc.DoTimeout(req, resp, timeout)
+}
+
+func (cc *LBClient) init() {
+ for _, c := range cc.Clients {
+ cc.cs = append(cc.cs, &lbClient{
+ c: c,
+ healthCheck: cc.HealthCheck,
+ })
+ }
+}
+
+func (cc *LBClient) get() *lbClient {
+ cc.once.Do(cc.init)
+
+ cs := cc.cs
+ idx := atomic.AddUint32(&cc.nextIdx, 1)
+ idx %= uint32(len(cs))
+
+ minC := cs[idx]
+ minN := minC.PendingRequests()
+ if minN == 0 {
+ return minC
+ }
+ for _, c := range cs[idx+1:] {
+ n := c.PendingRequests()
+ if n == 0 {
+ return c
+ }
+ if n < minN {
+ minC = c
+ minN = n
+ }
+ }
+ for _, c := range cs[:idx] {
+ n := c.PendingRequests()
+ if n == 0 {
+ return c
+ }
+ if n < minN {
+ minC = c
+ minN = n
+ }
+ }
+ return minC
+}
+
+type lbClient struct {
+ c BalancingClient
+ healthCheck func(req *Request, resp *Response, err error) bool
+ penalty uint32
+}
+
+func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
+ err := c.c.DoDeadline(req, resp, deadline)
+ if !c.isHealthy(req, resp, err) && c.incPenalty() {
+ // Penalize the client returning error, so the next requests
+ // are routed to another clients.
+ time.AfterFunc(penaltyDuration, c.decPenalty)
+ }
+ return err
+}
+
+func (c *lbClient) PendingRequests() int {
+ n := c.c.PendingRequests()
+ m := atomic.LoadUint32(&c.penalty)
+ return n + int(m)
+}
+
+func (c *lbClient) isHealthy(req *Request, resp *Response, err error) bool {
+ if c.healthCheck == nil {
+ return err == nil
+ }
+ return c.healthCheck(req, resp, err)
+}
+
+func (c *lbClient) incPenalty() bool {
+ m := atomic.AddUint32(&c.penalty, 1)
+ if m > maxPenalty {
+ c.decPenalty()
+ return false
+ }
+ return true
+}
+
+func (c *lbClient) decPenalty() {
+ atomic.AddUint32(&c.penalty, ^uint32(0))
+}
+
+const (
+ maxPenalty = 300
+
+ penaltyDuration = 3 * time.Second
+)