aboutsummaryrefslogtreecommitdiff
path: root/tcpdialer.go
diff options
context:
space:
mode:
authorGravatar Aliaksandr Valialkin <valyala@gmail.com> 2016-01-29 13:41:07 +0200
committerGravatar Aliaksandr Valialkin <valyala@gmail.com> 2016-01-29 13:41:07 +0200
commit9f42f3fe8af4f53f2f43b7a773a97b25fc7c4568 (patch)
tree1b536065a02f959e1742488ef5006f44dc968456 /tcpdialer.go
parentClient: made more clear timeout handling (diff)
downloadfasthttp-9f42f3fe8af4f53f2f43b7a773a97b25fc7c4568.tar.gz
fasthttp-9f42f3fe8af4f53f2f43b7a773a97b25fc7c4568.tar.bz2
fasthttp-9f42f3fe8af4f53f2f43b7a773a97b25fc7c4568.zip
tcpdialer: limit the number of concurrent dialers
Diffstat (limited to 'tcpdialer.go')
-rw-r--r--tcpdialer.go38
1 files changed, 30 insertions, 8 deletions
diff --git a/tcpdialer.go b/tcpdialer.go
index e96fdb8..b074d00 100644
--- a/tcpdialer.go
+++ b/tcpdialer.go
@@ -155,11 +155,16 @@ type tcpDialer struct {
tcpAddrsLock sync.Mutex
tcpAddrsMap map[string]*tcpAddrEntry
+ concurrencyCh chan struct{}
+
once sync.Once
}
+const maxDialConcurrency = 1000
+
func (d *tcpDialer) NewDial(timeout time.Duration) DialFunc {
d.once.Do(func() {
+ d.concurrencyCh = make(chan struct{}, maxDialConcurrency)
d.tcpAddrsMap = make(map[string]*tcpAddrEntry)
go d.tcpAddrsClean()
})
@@ -175,17 +180,15 @@ func (d *tcpDialer) NewDial(timeout time.Duration) DialFunc {
}
var conn net.Conn
- startTime := time.Now()
n := uint32(len(addrs))
- timeoutRemaining := timeout
+ deadline := time.Now().Add(timeout)
for n > 0 {
- conn, err = tryDial(network, &addrs[idx%n], timeoutRemaining)
+ conn, err = tryDial(network, &addrs[idx%n], deadline, d.concurrencyCh)
if err == nil {
return conn, nil
}
- timeoutRemaining -= time.Since(startTime)
- if timeoutRemaining <= 0 {
- return nil, ErrDialTimeout
+ if err == ErrDialTimeout {
+ return nil, err
}
idx++
n--
@@ -194,13 +197,32 @@ func (d *tcpDialer) NewDial(timeout time.Duration) DialFunc {
}
}
-func tryDial(network string, addr *net.TCPAddr, timeout time.Duration) (net.Conn, error) {
+func tryDial(network string, addr *net.TCPAddr, deadline time.Time, concurrencyCh chan struct{}) (net.Conn, error) {
+ timeout := -time.Since(deadline)
+ if timeout <= 0 {
+ return nil, ErrDialTimeout
+ }
+
+ select {
+ case concurrencyCh <- struct{}{}:
+ case <-time.After(timeout):
+ return nil, ErrDialTimeout
+ }
+
+ timeout = -time.Since(deadline)
+ if timeout <= 0 {
+ <-concurrencyCh
+ return nil, ErrDialTimeout
+ }
+
ch := make(chan dialResult, 1)
go func() {
var dr dialResult
dr.conn, dr.err = net.DialTCP(network, nil, addr)
ch <- dr
+ <-concurrencyCh
}()
+
select {
case dr := <-ch:
return dr.conn, dr.err
@@ -219,7 +241,7 @@ var ErrDialTimeout = errors.New("dialing to the given TCP address timed out")
// DefaultDialTimeout is timeout used by Dial and DialDualStack
// for establishing TCP connections.
-const DefaultDialTimeout = 60 * time.Second
+const DefaultDialTimeout = 3 * time.Second
type tcpAddrEntry struct {
addrs []net.TCPAddr