diff options
author | Kirill Danshin <kirill@danshin.pro> | 2021-02-06 23:03:23 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-02-06 23:03:23 +0300 |
commit | 0956208cc68009f2177e34af14d5ae766dae5734 (patch) | |
tree | 3f282b9ce34bca83144011c236c0e4ac38882b8e /streaming.go | |
parent | Add fasthttp.GenerateTestCertificate and use in tests (diff) | |
download | fasthttp-0956208cc68009f2177e34af14d5ae766dae5734.tar.gz fasthttp-0956208cc68009f2177e34af14d5ae766dae5734.tar.bz2 fasthttp-0956208cc68009f2177e34af14d5ae766dae5734.zip |
* Add request body streaming. Fixes #622
* Add test cases for StreamRequestBody
Co-authored-by: Kiyon <kiyonlin@163.com>
Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com>
Co-authored-by: Fiber
Diffstat (limited to 'streaming.go')
-rw-r--r-- | streaming.go | 89 |
1 files changed, 89 insertions, 0 deletions
diff --git a/streaming.go b/streaming.go new file mode 100644 index 0000000..a6ad0a9 --- /dev/null +++ b/streaming.go @@ -0,0 +1,89 @@ +package fasthttp + +import ( + "bufio" + "bytes" + "fmt" + "io" + "sync" + + "github.com/valyala/bytebufferpool" +) + +type requestStream struct { + prefetchedBytes *bytes.Reader + reader *bufio.Reader + totalBytesRead int + contentLength int +} + +func (rs *requestStream) Read(p []byte) (int, error) { + if rs.contentLength == -1 { + p = p[:0] + strCRLFLen := len(strCRLF) + chunkSize, err := parseChunkSize(rs.reader) + if err != nil { + return len(p), err + } + p, err = appendBodyFixedSize(rs.reader, p, chunkSize+strCRLFLen) + if err != nil { + return len(p), err + } + if !bytes.Equal(p[len(p)-strCRLFLen:], strCRLF) { + return len(p), ErrBrokenChunk{ + error: fmt.Errorf("cannot find crlf at the end of chunk"), + } + } + p = p[:len(p)-strCRLFLen] + if chunkSize == 0 { + return len(p), io.EOF + } + return len(p), nil + } + if rs.totalBytesRead == rs.contentLength { + return 0, io.EOF + } + var n int + var err error + if int(rs.prefetchedBytes.Size()) > rs.totalBytesRead { + n, err := rs.prefetchedBytes.Read(p) + rs.totalBytesRead += n + if n == rs.contentLength { + return n, io.EOF + } + return n, err + } else { + n, err = rs.reader.Read(p) + rs.totalBytesRead += n + if err != nil { + return n, err + } + } + + if rs.totalBytesRead == rs.contentLength { + err = io.EOF + } + return n, err +} + +func acquireRequestStream(b *bytebufferpool.ByteBuffer, r *bufio.Reader, contentLength int) *requestStream { + rs := requestStreamPool.Get().(*requestStream) + rs.prefetchedBytes = bytes.NewReader(b.B) + rs.reader = r + rs.contentLength = contentLength + + return rs +} + +func releaseRequestStream(rs *requestStream) { + rs.prefetchedBytes = nil + rs.totalBytesRead = 0 + rs.reader = nil + requestStreamPool.Put(rs) +} + +var requestStreamPool = sync.Pool{ + New: func() interface{} { + return &requestStream{} + }, +} |