diff options
author | Aliaksandr Valialkin <valyala@gmail.com> | 2016-01-29 13:41:07 +0200 |
---|---|---|
committer | Aliaksandr Valialkin <valyala@gmail.com> | 2016-01-29 13:41:07 +0200 |
commit | 9f42f3fe8af4f53f2f43b7a773a97b25fc7c4568 (patch) | |
tree | 1b536065a02f959e1742488ef5006f44dc968456 /tcpdialer.go | |
parent | Client: made more clear timeout handling (diff) | |
download | fasthttp-9f42f3fe8af4f53f2f43b7a773a97b25fc7c4568.tar.gz fasthttp-9f42f3fe8af4f53f2f43b7a773a97b25fc7c4568.tar.bz2 fasthttp-9f42f3fe8af4f53f2f43b7a773a97b25fc7c4568.zip |
tcpdialer: limit the number of concurrent dialers
Diffstat (limited to 'tcpdialer.go')
-rw-r--r-- | tcpdialer.go | 38 |
1 files changed, 30 insertions, 8 deletions
diff --git a/tcpdialer.go b/tcpdialer.go index e96fdb8..b074d00 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -155,11 +155,16 @@ type tcpDialer struct { tcpAddrsLock sync.Mutex tcpAddrsMap map[string]*tcpAddrEntry + concurrencyCh chan struct{} + once sync.Once } +const maxDialConcurrency = 1000 + func (d *tcpDialer) NewDial(timeout time.Duration) DialFunc { d.once.Do(func() { + d.concurrencyCh = make(chan struct{}, maxDialConcurrency) d.tcpAddrsMap = make(map[string]*tcpAddrEntry) go d.tcpAddrsClean() }) @@ -175,17 +180,15 @@ func (d *tcpDialer) NewDial(timeout time.Duration) DialFunc { } var conn net.Conn - startTime := time.Now() n := uint32(len(addrs)) - timeoutRemaining := timeout + deadline := time.Now().Add(timeout) for n > 0 { - conn, err = tryDial(network, &addrs[idx%n], timeoutRemaining) + conn, err = tryDial(network, &addrs[idx%n], deadline, d.concurrencyCh) if err == nil { return conn, nil } - timeoutRemaining -= time.Since(startTime) - if timeoutRemaining <= 0 { - return nil, ErrDialTimeout + if err == ErrDialTimeout { + return nil, err } idx++ n-- @@ -194,13 +197,32 @@ func (d *tcpDialer) NewDial(timeout time.Duration) DialFunc { } } -func tryDial(network string, addr *net.TCPAddr, timeout time.Duration) (net.Conn, error) { +func tryDial(network string, addr *net.TCPAddr, deadline time.Time, concurrencyCh chan struct{}) (net.Conn, error) { + timeout := -time.Since(deadline) + if timeout <= 0 { + return nil, ErrDialTimeout + } + + select { + case concurrencyCh <- struct{}{}: + case <-time.After(timeout): + return nil, ErrDialTimeout + } + + timeout = -time.Since(deadline) + if timeout <= 0 { + <-concurrencyCh + return nil, ErrDialTimeout + } + ch := make(chan dialResult, 1) go func() { var dr dialResult dr.conn, dr.err = net.DialTCP(network, nil, addr) ch <- dr + <-concurrencyCh }() + select { case dr := <-ch: return dr.conn, dr.err @@ -219,7 +241,7 @@ var ErrDialTimeout = errors.New("dialing to the given TCP address timed out") // DefaultDialTimeout is timeout used by Dial and DialDualStack // for establishing TCP connections. -const DefaultDialTimeout = 60 * time.Second +const DefaultDialTimeout = 3 * time.Second type tcpAddrEntry struct { addrs []net.TCPAddr |