diff options
author | Andy Pan <panjf2000@gmail.com> | 2022-11-17 13:31:03 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-17 06:31:03 +0100 |
commit | 8f434434e755413523bc6dbd05e4c031dffd2ffc (patch) | |
tree | 1a2a2187531813b255ad28200918cc7206d81cc8 /client.go | |
parent | Fix some potential pool leaks (#1433) (diff) | |
download | fasthttp-8f434434e755413523bc6dbd05e4c031dffd2ffc.tar.gz fasthttp-8f434434e755413523bc6dbd05e4c031dffd2ffc.tar.bz2 fasthttp-8f434434e755413523bc6dbd05e4c031dffd2ffc.zip |
Wait for the response of pipelineWork in background and return it to pool (#1436)
Diffstat (limited to 'client.go')
-rw-r--r-- | client.go | 80 |
1 files changed, 40 insertions, 40 deletions
@@ -2376,7 +2376,7 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...) } - w := acquirePipelineWork(&c.workPool, timeout) + w := c.acquirePipelineWork(timeout) w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing w.req = &w.reqCopy w.resp = &w.respCopy @@ -2394,7 +2394,7 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t select { case c.chW <- w: case <-w.t.C: - releasePipelineWork(&c.workPool, w) + c.releasePipelineWork(w) return ErrTimeout } } @@ -2408,7 +2408,7 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t swapResponseBody(resp, &w.respCopy) } err = w.err - releasePipelineWork(&c.workPool, w) + c.releasePipelineWork(w) case <-w.t.C: err = ErrTimeout } @@ -2416,6 +2416,40 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t return err } +func (c *pipelineConnClient) acquirePipelineWork(timeout time.Duration) (w *pipelineWork) { + v := c.workPool.Get() + if v != nil { + w = v.(*pipelineWork) + } else { + w = &pipelineWork{ + done: make(chan struct{}, 1), + } + } + if timeout > 0 { + if w.t == nil { + w.t = time.NewTimer(timeout) + } else { + w.t.Reset(timeout) + } + w.deadline = time.Now().Add(timeout) + } else { + w.deadline = zeroTime + } + return w +} + +func (c *pipelineConnClient) releasePipelineWork(w *pipelineWork) { + if w.t != nil { + w.t.Stop() + } + w.reqCopy.Reset() + w.respCopy.Reset() + w.req = nil + w.resp = nil + w.err = nil + c.workPool.Put(w) +} + // Do performs the given http request and sets the corresponding response. // // Request must contain at least non-zero RequestURI with full url (including @@ -2443,7 +2477,7 @@ func (c *pipelineConnClient) Do(req *Request, resp *Response) error { req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...) } - w := acquirePipelineWork(&c.workPool, 0) + w := c.acquirePipelineWork(0) w.req = req if resp != nil { resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing @@ -2466,7 +2500,7 @@ func (c *pipelineConnClient) Do(req *Request, resp *Response) error { select { case c.chW <- w: default: - releasePipelineWork(&c.workPool, w) + c.releasePipelineWork(w) return ErrPipelineOverflow } } @@ -2475,7 +2509,7 @@ func (c *pipelineConnClient) Do(req *Request, resp *Response) error { <-w.done err := w.err - releasePipelineWork(&c.workPool, w) + c.releasePipelineWork(w) return err } @@ -2852,37 +2886,3 @@ func (c *pipelineConnClient) getClientName() []byte { } var errPipelineConnStopped = errors.New("pipeline connection has been stopped") - -func acquirePipelineWork(pool *sync.Pool, timeout time.Duration) (w *pipelineWork) { - v := pool.Get() - if v != nil { - w = v.(*pipelineWork) - } else { - w = &pipelineWork{ - done: make(chan struct{}, 1), - } - } - if timeout > 0 { - if w.t == nil { - w.t = time.NewTimer(timeout) - } else { - w.t.Reset(timeout) - } - w.deadline = time.Now().Add(timeout) - } else { - w.deadline = zeroTime - } - return w -} - -func releasePipelineWork(pool *sync.Pool, w *pipelineWork) { - if w.t != nil { - w.t.Stop() - } - w.reqCopy.Reset() - w.respCopy.Reset() - w.req = nil - w.resp = nil - w.err = nil - pool.Put(w) -} |