aboutsummaryrefslogtreecommitdiff
path: root/streaming.go
diff options
context:
space:
mode:
authorGravatar Kirill Danshin <kirill@danshin.pro> 2021-02-06 23:03:23 +0300
committerGravatar GitHub <noreply@github.com> 2021-02-06 23:03:23 +0300
commit0956208cc68009f2177e34af14d5ae766dae5734 (patch)
tree3f282b9ce34bca83144011c236c0e4ac38882b8e /streaming.go
parentAdd fasthttp.GenerateTestCertificate and use in tests (diff)
downloadfasthttp-0956208cc68009f2177e34af14d5ae766dae5734.tar.gz
fasthttp-0956208cc68009f2177e34af14d5ae766dae5734.tar.bz2
fasthttp-0956208cc68009f2177e34af14d5ae766dae5734.zip
Add request body streaming. Fixes #622 (#911)
* Add request body streaming. Fixes #622 * Add test cases for StreamRequestBody Co-authored-by: Kiyon <kiyonlin@163.com> Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com> Co-authored-by: Fiber
Diffstat (limited to 'streaming.go')
-rw-r--r--streaming.go89
1 files changed, 89 insertions, 0 deletions
diff --git a/streaming.go b/streaming.go
new file mode 100644
index 0000000..a6ad0a9
--- /dev/null
+++ b/streaming.go
@@ -0,0 +1,89 @@
+package fasthttp
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "io"
+ "sync"
+
+ "github.com/valyala/bytebufferpool"
+)
+
+type requestStream struct {
+ prefetchedBytes *bytes.Reader
+ reader *bufio.Reader
+ totalBytesRead int
+ contentLength int
+}
+
+func (rs *requestStream) Read(p []byte) (int, error) {
+ if rs.contentLength == -1 {
+ p = p[:0]
+ strCRLFLen := len(strCRLF)
+ chunkSize, err := parseChunkSize(rs.reader)
+ if err != nil {
+ return len(p), err
+ }
+ p, err = appendBodyFixedSize(rs.reader, p, chunkSize+strCRLFLen)
+ if err != nil {
+ return len(p), err
+ }
+ if !bytes.Equal(p[len(p)-strCRLFLen:], strCRLF) {
+ return len(p), ErrBrokenChunk{
+ error: fmt.Errorf("cannot find crlf at the end of chunk"),
+ }
+ }
+ p = p[:len(p)-strCRLFLen]
+ if chunkSize == 0 {
+ return len(p), io.EOF
+ }
+ return len(p), nil
+ }
+ if rs.totalBytesRead == rs.contentLength {
+ return 0, io.EOF
+ }
+ var n int
+ var err error
+ if int(rs.prefetchedBytes.Size()) > rs.totalBytesRead {
+ n, err := rs.prefetchedBytes.Read(p)
+ rs.totalBytesRead += n
+ if n == rs.contentLength {
+ return n, io.EOF
+ }
+ return n, err
+ } else {
+ n, err = rs.reader.Read(p)
+ rs.totalBytesRead += n
+ if err != nil {
+ return n, err
+ }
+ }
+
+ if rs.totalBytesRead == rs.contentLength {
+ err = io.EOF
+ }
+ return n, err
+}
+
+func acquireRequestStream(b *bytebufferpool.ByteBuffer, r *bufio.Reader, contentLength int) *requestStream {
+ rs := requestStreamPool.Get().(*requestStream)
+ rs.prefetchedBytes = bytes.NewReader(b.B)
+ rs.reader = r
+ rs.contentLength = contentLength
+
+ return rs
+}
+
+func releaseRequestStream(rs *requestStream) {
+ rs.prefetchedBytes = nil
+ rs.totalBytesRead = 0
+ rs.reader = nil
+ requestStreamPool.Put(rs)
+}
+
+var requestStreamPool = sync.Pool{
+ New: func() interface{} {
+ return &requestStream{}
+ },
+}