aboutsummaryrefslogtreecommitdiff
path: root/fs.go
diff options
context:
space:
mode:
authorGravatar byene0923 <34917110+byene0923@users.noreply.github.com> 2022-10-30 16:48:46 +0800
committerGravatar GitHub <noreply@github.com> 2022-10-30 09:48:46 +0100
commita468a7dd3734d9866ef6ab8ee1e36695f5c3b09c (patch)
tree0527f64b11e07d2841a25c073f1d8e2e0dbebb55 /fs.go
parentfeat: add PeekKeys and PeekTrailerKeys (#1405) (diff)
downloadfasthttp-a468a7dd3734d9866ef6ab8ee1e36695f5c3b09c.tar.gz
fasthttp-a468a7dd3734d9866ef6ab8ee1e36695f5c3b09c.tar.bz2
fasthttp-a468a7dd3734d9866ef6ab8ee1e36695f5c3b09c.zip
feat: support mulit/range (#1398)
* feat: support mulit/range * fix: 1. lint code 2. add SetByteRanges method * fix: reduce the test number of testFSSingleByteRange
Diffstat (limited to 'fs.go')
-rw-r--r--fs.go787
1 files changed, 665 insertions, 122 deletions
diff --git a/fs.go b/fs.go
index a8bcd13..bb3ee07 100644
--- a/fs.go
+++ b/fs.go
@@ -2,6 +2,7 @@ package fasthttp
import (
"bytes"
+ "crypto/rand"
"errors"
"fmt"
"html"
@@ -501,7 +502,8 @@ type fsHandler struct {
cacheGzip map[string]*fsFile
cacheLock sync.Mutex
- smallFileReaderPool sync.Pool
+ smallFileReaderPool sync.Pool
+ smallRangeReaderPool sync.Pool
}
type fsFile struct {
@@ -520,6 +522,9 @@ type fsFile struct {
bigFiles []*bigFileReader
bigFilesLock sync.Mutex
+
+ rangeBigFiles []*bigRangeReader
+ rangeBigFilesLock sync.Mutex
}
func (ff *fsFile) NewReader() (io.Reader, error) {
@@ -540,9 +545,8 @@ func (ff *fsFile) smallFileReader() (io.Reader, error) {
}
r := v.(*fsSmallFileReader)
r.ff = ff
- r.endPos = ff.contentLength
- if r.startPos > 0 {
- return nil, errors.New("bug: fsSmallFileReader with non-nil startPos found in the pool")
+ if r.offset > 0 {
+ panic("BUG: fsSmallFileReader with non-nil offset found in the pool")
}
return r, nil
}
@@ -580,7 +584,6 @@ func (ff *fsFile) bigFileReader() (io.Reader, error) {
return &bigFileReader{
f: f,
ff: ff,
- r: f,
}, nil
}
@@ -607,143 +610,175 @@ func (ff *fsFile) decReadersCount() {
ff.h.cacheLock.Unlock()
}
+func (ff *fsFile) NewRangeReader() (io.Reader, error) {
+ if ff.isBig() {
+ r, err := ff.bigRangeReader()
+ if err != nil {
+ ff.decReadersCount()
+ }
+ return r, err
+ }
+ return ff.smallRangeReader(), nil
+}
+
+func (ff *fsFile) smallRangeReader() io.Reader {
+ v := ff.h.smallRangeReaderPool.Get()
+ if v == nil {
+ v = &smallRangeReader{
+ sta: make([]int, 64),
+ end: make([]int, 64),
+ buf: make([]byte, 4096),
+ }
+ }
+ r := v.(*smallRangeReader)
+ r.ff = ff
+ r.sta, r.end, r.buf = r.sta[:0], r.end[:0], r.buf[:0]
+ r.hf = true
+ r.he = false
+ r.hasCurRangeBodyHeader = false
+ r.cRange = 0
+ r.boundary = randomBoundary()
+ return r
+}
+
+func (ff *fsFile) bigRangeReader() (io.Reader, error) {
+ if ff.f == nil {
+ panic("BUG: ff.f must be non-nil in bigRangeReader")
+ }
+ var r io.Reader
+
+ ff.rangeBigFilesLock.Lock()
+ n := len(ff.rangeBigFiles)
+ if n > 0 {
+ r = ff.rangeBigFiles[n-1]
+ ff.rangeBigFiles = ff.rangeBigFiles[:n-1]
+ }
+ ff.rangeBigFilesLock.Unlock()
+
+ if r != nil {
+ return r, nil
+ }
+ f, err := os.Open(ff.f.Name())
+ if err != nil {
+ return nil, fmt.Errorf("cannot open already opened file: %s", err)
+ }
+ rr := &bigRangeReader{
+ ff: ff,
+ f: f,
+ sta: make([]int, 64),
+ end: make([]int, 64),
+ buf: make([]byte, 4096),
+ hf: true,
+ he: false,
+ hasCurRangeBodyHeader: false,
+ cRange: 0,
+ boundary: randomBoundary(),
+ }
+ rr.sta, rr.end, rr.buf = rr.sta[:0], rr.end[:0], rr.buf[:0]
+ return rr, nil
+}
+
// bigFileReader attempts to trigger sendfile
// for sending big files over the wire.
type bigFileReader struct {
f *os.File
ff *fsFile
- r io.Reader
- lr io.LimitedReader
-}
-
-func (r *bigFileReader) UpdateByteRange(startPos, endPos int) error {
- if _, err := r.f.Seek(int64(startPos), 0); err != nil {
- return err
- }
- r.r = &r.lr
- r.lr.R = r.f
- r.lr.N = int64(endPos - startPos + 1)
- return nil
}
func (r *bigFileReader) Read(p []byte) (int, error) {
- return r.r.Read(p)
+ return r.f.Read(p)
}
func (r *bigFileReader) WriteTo(w io.Writer) (int64, error) {
if rf, ok := w.(io.ReaderFrom); ok {
// fast path. Send file must be triggered
- return rf.ReadFrom(r.r)
+ return rf.ReadFrom(r.f)
}
// slow path
- return copyZeroAlloc(w, r.r)
+ return copyZeroAlloc(w, r.f)
}
func (r *bigFileReader) Close() error {
- r.r = r.f
n, err := r.f.Seek(0, 0)
if err == nil {
- if n == 0 {
- ff := r.ff
- ff.bigFilesLock.Lock()
- ff.bigFiles = append(ff.bigFiles, r)
- ff.bigFilesLock.Unlock()
- } else {
- _ = r.f.Close()
- err = errors.New("bug: File.Seek(0,0) returned (non-zero, nil)")
+ if n != 0 {
+ panic("BUG: File.Seek(0,0) returned (non-zero, nil)")
}
+ ff := r.ff
+ ff.bigFilesLock.Lock()
+ ff.bigFiles = append(ff.bigFiles, r)
+ ff.bigFilesLock.Unlock()
} else {
- _ = r.f.Close()
+ r.f.Close()
}
r.ff.decReadersCount()
return err
}
type fsSmallFileReader struct {
- ff *fsFile
- startPos int
- endPos int
+ ff *fsFile
+ offset int64
}
func (r *fsSmallFileReader) Close() error {
ff := r.ff
ff.decReadersCount()
r.ff = nil
- r.startPos = 0
- r.endPos = 0
+ r.offset = 0
ff.h.smallFileReaderPool.Put(r)
return nil
}
-func (r *fsSmallFileReader) UpdateByteRange(startPos, endPos int) error {
- r.startPos = startPos
- r.endPos = endPos + 1
- return nil
-}
-
func (r *fsSmallFileReader) Read(p []byte) (int, error) {
- tailLen := r.endPos - r.startPos
- if tailLen <= 0 {
- return 0, io.EOF
- }
- if len(p) > tailLen {
- p = p[:tailLen]
- }
-
ff := r.ff
+
if ff.f != nil {
- n, err := ff.f.ReadAt(p, int64(r.startPos))
- r.startPos += n
+ n, err := ff.f.ReadAt(p, r.offset)
+ r.offset += int64(n)
return n, err
}
-
- n := copy(p, ff.dirIndex[r.startPos:])
- r.startPos += n
+ if r.offset == int64(len(ff.dirIndex)) {
+ return 0, io.EOF
+ }
+ n := copy(p, ff.dirIndex[r.offset:])
+ r.offset += int64(n)
return n, nil
}
func (r *fsSmallFileReader) WriteTo(w io.Writer) (int64, error) {
+ if r.offset != 0 {
+ panic("BUG: no-zero offset! Read() mustn't be called before WriteTo()")
+ }
ff := r.ff
var n int
var err error
if ff.f == nil {
- n, err = w.Write(ff.dirIndex[r.startPos:r.endPos])
+ n, err = w.Write(ff.dirIndex)
return int64(n), err
}
-
if rf, ok := w.(io.ReaderFrom); ok {
return rf.ReadFrom(r)
}
-
- curPos := r.startPos
bufv := copyBufPool.Get()
buf := bufv.([]byte)
for err == nil {
- tailLen := r.endPos - curPos
- if tailLen <= 0 {
- break
- }
- if len(buf) > tailLen {
- buf = buf[:tailLen]
- }
- n, err = ff.f.ReadAt(buf, int64(curPos))
+ n, err = ff.f.ReadAt(buf, r.offset)
nw, errw := w.Write(buf[:n])
- curPos += nw
+ r.offset += int64(nw)
if errw == nil && nw != n {
- errw = errors.New("bug: Write(p) returned (n, nil), where n != len(p)")
+ panic("BUG: Write(p) returned (n, nil), where n != len(p)")
}
if err == nil {
err = errw
}
}
copyBufPool.Put(bufv)
-
if err == io.EOF {
err = nil
}
- return int64(curPos - r.startPos), err
+ return r.offset, err
}
func (h *fsHandler) cleanCache(pendingFiles []*fsFile) []*fsFile {
@@ -916,7 +951,14 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {
return
}
- r, err := ff.NewReader()
+ var r io.Reader
+ var err error
+ if h.acceptByteRange && len(byteRange) > 0 {
+ r, err = ff.NewRangeReader()
+ } else {
+ r, err = ff.NewReader()
+ }
+
if err != nil {
ctx.Logger().Printf("cannot obtain file reader for path=%q: %v", path, err)
ctx.Error("Internal Server Error", StatusInternalServerError)
@@ -937,23 +979,27 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {
if h.acceptByteRange {
hdr.setNonSpecial(strAcceptRanges, strBytes)
if len(byteRange) > 0 {
- startPos, endPos, err := ParseByteRange(byteRange, contentLength)
+ staList, endList, err := ParseByteRanges(byteRange, contentLength)
if err != nil {
- _ = r.(io.Closer).Close()
- ctx.Logger().Printf("cannot parse byte range %q for path=%q: %v", byteRange, path, err)
+ r.(io.Closer).Close()
+ ctx.Logger().Printf("Cannot parse byte range %q for path=%q,error=%s", byteRange, path, err)
ctx.Error("Range Not Satisfiable", StatusRequestedRangeNotSatisfiable)
return
}
- if err = r.(byteRangeUpdater).UpdateByteRange(startPos, endPos); err != nil {
- _ = r.(io.Closer).Close()
- ctx.Logger().Printf("cannot seek byte range %q for path=%q: %v", byteRange, path, err)
+ if err = r.(byteRangeHandler).ByteRangeUpdate(staList, endList); err != nil {
+ r.(io.Closer).Close()
+ ctx.Logger().Printf("Cannot seek byte range %q for path=%q, error=%s", byteRange, path, err)
ctx.Error("Internal Server Error", StatusInternalServerError)
return
}
-
- hdr.SetContentRange(startPos, endPos, contentLength)
- contentLength = endPos - startPos + 1
+ switch {
+ case len(staList) == 1:
+ hdr.SetContentRange(staList[0], endList[0], contentLength)
+ case len(staList) > 1:
+ hdr.SetContentType(fmt.Sprintf("multipart/byteranges; boundary=%s", r.(byteRangeHandler).Boundary()))
+ }
+ contentLength = r.(byteRangeHandler).ByteRangeLength()
statusCode = StatusPartialContent
}
}
@@ -980,62 +1026,63 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {
ctx.SetStatusCode(statusCode)
}
-type byteRangeUpdater interface {
- UpdateByteRange(startPos, endPos int) error
-}
-
// ParseByteRange parses 'Range: bytes=...' header value.
//
// It follows https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 .
-func ParseByteRange(byteRange []byte, contentLength int) (startPos, endPos int, err error) {
+func ParseByteRanges(byteRange []byte, contentLength int) (startPos, endPos []int, err error) {
b := byteRange
if !bytes.HasPrefix(b, strBytes) {
- return 0, 0, fmt.Errorf("unsupported range units: %q. Expecting %q", byteRange, strBytes)
+ return startPos, endPos, fmt.Errorf("unsupported range units: %q. Expecting %q", byteRange, strBytes)
}
-
b = b[len(strBytes):]
if len(b) == 0 || b[0] != '=' {
- return 0, 0, fmt.Errorf("missing byte range in %q", byteRange)
+ return startPos, endPos, fmt.Errorf("missing byte range in %q", byteRange)
}
b = b[1:]
- n := bytes.IndexByte(b, '-')
- if n < 0 {
- return 0, 0, fmt.Errorf("missing the end position of byte range in %q", byteRange)
- }
+ var n, sta, end, v int
- if n == 0 {
- v, err := ParseUint(b[n+1:])
- if err != nil {
- return 0, 0, err
+ for _, ra := range bytes.Split(b, s2b(",")) {
+ n = bytes.IndexByte(ra, '-')
+ if n < 0 {
+ return startPos, endPos, fmt.Errorf("missing the end position of byte range in %q", byteRange)
}
- startPos := contentLength - v
- if startPos < 0 {
- startPos = 0
+ if n == 0 {
+ v, err = ParseUint(ra[n+1:])
+ if err != nil {
+ return startPos, endPos, err
+ }
+ sta = contentLength - v
+ if sta < 0 {
+ sta = 0
+ }
+ startPos = append(startPos, sta)
+ endPos = append(endPos, contentLength-1)
+ continue
}
- return startPos, contentLength - 1, nil
- }
-
- if startPos, err = ParseUint(b[:n]); err != nil {
- return 0, 0, err
- }
- if startPos >= contentLength {
- return 0, 0, fmt.Errorf("the start position of byte range cannot exceed %d. byte range %q", contentLength-1, byteRange)
- }
-
- b = b[n+1:]
- if len(b) == 0 {
- return startPos, contentLength - 1, nil
- }
-
- if endPos, err = ParseUint(b); err != nil {
- return 0, 0, err
- }
- if endPos >= contentLength {
- endPos = contentLength - 1
- }
- if endPos < startPos {
- return 0, 0, fmt.Errorf("the start position of byte range cannot exceed the end position. byte range %q", byteRange)
+ if sta, err = ParseUint(ra[:n]); err != nil {
+ return startPos, endPos, err
+ }
+ if sta >= contentLength {
+ return startPos, endPos, fmt.Errorf("the start position of byte range cannot exceed %d. byte range %q", contentLength-1, byteRange)
+ }
+ ra = ra[n+1:]
+ if len(ra) == 0 {
+ startPos = append(startPos, sta)
+ endPos = append(endPos, contentLength-1)
+ continue
+ }
+ if end, err = ParseUint(ra); err != nil {
+ return startPos, endPos, err
+ }
+ if end >= contentLength {
+ end = contentLength - 1
+ }
+ if end < sta {
+ return startPos, endPos, fmt.Errorf("the start position of byte range cannot exceed the end position. byte range %q", byteRange)
+ }
+ startPos = append(startPos, sta)
+ endPos = append(endPos, end)
}
return startPos, endPos, nil
}
@@ -1451,3 +1498,499 @@ func getFileLock(absPath string) *sync.Mutex {
filelock := v.(*sync.Mutex)
return filelock
}
+
+type byteRangeHandler interface {
+ ByteRangeUpdate(sta, end []int) error
+ ByteRangeLength() int
+ Boundary() string
+ IsMultiRange() bool
+}
+
+type smallRangeReader struct {
+ ff *fsFile
+ sta []int
+ end []int
+
+ // handle multi range
+ buf []byte
+ hf bool
+ he bool
+ hasCurRangeBodyHeader bool
+ cRange int
+ boundary string
+}
+
+func (r *smallRangeReader) ByteRangeUpdate(sta, end []int) error {
+ for i := range sta {
+ r.sta = append(r.sta, sta[i])
+ r.end = append(r.end, end[i]+1)
+ }
+ return nil
+}
+
+func (r *smallRangeReader) IsMultiRange() bool {
+ return len(r.sta) > 1
+}
+
+func (r *smallRangeReader) Boundary() string {
+ return r.boundary
+}
+
+func (r *smallRangeReader) ByteRangeLength() int {
+ if !r.IsMultiRange() {
+ return r.end[0] - r.sta[0]
+ }
+ return multiRangeLength(r.sta, r.end, r.ff.contentLength, r.ff.contentType, r.boundary)
+}
+
+func (r *smallRangeReader) Close() error {
+ ff := r.ff
+ ff.decReadersCount()
+ r.ff = nil
+
+ r.sta, r.end, r.buf = r.sta[:0], r.end[:0], r.buf[:0]
+ r.cRange = 0
+ r.hf = true
+ r.he = false
+ r.hasCurRangeBodyHeader = false
+ r.boundary = ""
+ ff.h.smallRangeReaderPool.Put(r)
+ return nil
+}
+
+func (r *smallRangeReader) Read(p []byte) (int, error) {
+ ff := r.ff
+ var err error
+ cPos, cLen, n := 0, 0, 0
+
+ if r.cRange >= len(r.sta) && len(r.buf) == 0 && r.he {
+ return 0, io.EOF
+ }
+
+ if r.IsMultiRange() {
+ cLen = len(p[cPos:])
+ n = copy(p[cPos:], r.buf)
+ if len(r.buf) > cLen {
+ r.buf = r.buf[n:]
+ return n, nil
+ }
+ cPos += n
+ r.buf = r.buf[:0]
+ }
+
+ for i := r.cRange; i < len(r.sta); i++ {
+ if r.sta[i] >= r.end[i] {
+ continue
+ }
+
+ if r.IsMultiRange() && !r.hasCurRangeBodyHeader {
+ r.hasCurRangeBodyHeader = true
+ multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.hf)
+ r.hf = false
+ cLen = len(p[cPos:])
+ n = copy(p[cPos:], r.buf)
+ cPos += n
+ if len(r.buf) > cLen {
+ r.buf = r.buf[n:]
+ return cPos, nil
+ }
+ r.buf = r.buf[:0]
+ }
+
+ cLen = len(p[cPos:])
+
+ // handle file
+ if ff.f != nil {
+ if r.end[i]-r.sta[i] > cLen {
+ n, err = ff.f.ReadAt(p[cPos:], int64(r.sta[i]))
+ r.sta[i] += n
+ return cPos + n, err
+ }
+
+ // todo use pool
+ cBody := make([]byte, r.end[i]-r.sta[i])
+ n, err = ff.f.ReadAt(cBody, int64(r.sta[i]))
+ if err != nil && err != io.EOF {
+ return cPos + n, err
+ }
+ n = copy(p[cPos:], cBody)
+ } else {
+ // handle dir
+ n = copy(p[cPos:], ff.dirIndex[r.sta[i]:r.end[i]])
+ if r.end[i]-r.sta[i] > cLen {
+ r.sta[i] += n
+ return cPos + n, nil
+ }
+ }
+
+ cPos += n
+ r.cRange = i + 1
+ r.hasCurRangeBodyHeader = false
+ }
+
+ if r.IsMultiRange() && !r.he {
+ multiRangeBodyEnd(&r.buf, r.boundary)
+ r.he = true
+ n = copy(p[cPos:], r.buf)
+ if len(r.buf) > len(p[cPos:]) {
+ r.buf = r.buf[n:]
+ return cPos + n, nil
+ }
+ cPos += n
+ r.buf = r.buf[:0]
+ }
+ return cPos, io.EOF
+}
+
+func (r *smallRangeReader) WriteTo(w io.Writer) (int64, error) {
+ ff := r.ff
+ var err error
+ cPos, cLen, n, sum := 0, 0, 0, 0
+ bufv := copyBufPool.Get()
+ buf := bufv.([]byte)
+ hf := true
+ if ff.f == nil {
+ for i := range r.sta {
+
+ if r.IsMultiRange() {
+ buf = buf[:0]
+ if i > 0 {
+ hf = false
+ }
+ multiRangeBodyHeader(&buf, r.sta[i], r.end[i], ff.contentLength, ff.contentType, r.boundary, hf)
+ nw, errw := w.Write(buf)
+ if errw == nil && nw != len(buf) {
+ panic("BUG: buf returned(n, nil),where n != len(buf)")
+ }
+ sum += nw
+ if errw != nil {
+ return int64(sum), errw
+ }
+ }
+
+ n, err = w.Write(ff.dirIndex[r.sta[i]:r.end[i]])
+ sum += n
+ if err != nil {
+ return int64(sum), err
+ }
+ }
+ if r.IsMultiRange() {
+ buf = buf[:0]
+ multiRangeBodyEnd(&buf, r.boundary)
+ nw, errw := w.Write(buf)
+ if errw == nil && nw != len(buf) {
+ panic("BUG: buf returned (n, nil), where n != len(buf)")
+ }
+ sum += nw
+ err = errw
+ }
+ return int64(sum), err
+ }
+
+ if rf, ok := w.(io.ReaderFrom); ok {
+ return rf.ReadFrom(r)
+ }
+
+ for i := range r.sta {
+
+ if r.IsMultiRange() {
+ buf = buf[:0]
+ if i > 0 {
+ hf = false
+ }
+ multiRangeBodyHeader(&buf, r.sta[i], r.end[i], ff.contentLength, ff.contentType, r.boundary, hf)
+ nw, errw := w.Write(buf)
+ if errw == nil && nw != len(buf) {
+ panic("BUG: buf returned(n, nil),where n != len(buf)")
+ }
+ sum += nw
+ if errw != nil {
+ return int64(sum), errw
+ }
+ }
+
+ cPos = r.sta[i]
+ buf = buf[:4096]
+ for err == nil {
+ cLen = r.end[i] - cPos
+ if cLen <= 0 {
+ break
+ }
+ if len(buf) > cLen {
+ buf = buf[:cLen]
+ }
+ n, err = ff.f.ReadAt(buf, int64(cPos))
+ nw, errw := w.Write(buf[:n])
+ cPos += nw
+ sum += nw
+ if errw == nil && nw != n {
+ panic("BUG: Write(p) returned (n, nil), where n != len(p)")
+ }
+ if err == nil {
+ err = errw
+ }
+ }
+ }
+ if err == io.EOF {
+ err = nil
+ }
+ if r.IsMultiRange() {
+ buf = buf[:0]
+ multiRangeBodyEnd(&buf, r.boundary)
+ nw, errw := w.Write(buf)
+ if errw == nil && nw != len(buf) {
+ panic("BUG: buf returned (n, nil), where n != len(buf)")
+ }
+ sum += nw
+ if err == nil {
+ err = errw
+ }
+ }
+ return int64(sum), err
+}
+
+type bigRangeReader struct {
+ ff *fsFile
+ f *os.File
+ sta []int
+ end []int
+
+ // handle multi range
+ buf []byte
+ hf bool
+ he bool
+ hasCurRangeBodyHeader bool
+ cRange int
+ boundary string
+}
+
+func (r *bigRangeReader) ByteRangeUpdate(sta, end []int) error {
+ for i := range sta {
+ r.sta = append(r.sta, sta[i])
+ r.end = append(r.end, end[i]+1)
+ }
+ return nil
+}
+
+func (r *bigRangeReader) IsMultiRange() bool {
+ return len(r.sta) > 1
+}
+
+func (r *bigRangeReader) Boundary() string {
+ return r.boundary
+}
+
+func (r *bigRangeReader) ByteRangeLength() int {
+ if !r.IsMultiRange() {
+ return r.end[0] - r.sta[0]
+ }
+ return multiRangeLength(r.sta, r.end, r.ff.contentLength, r.ff.contentType, r.boundary)
+}
+
+func (r *bigRangeReader) Close() error {
+ n, err := r.f.Seek(0, 0)
+ if err == nil {
+ if n != 0 {
+ panic("BUG: File.Seek(0,0) returned (non-zero, nil)")
+ }
+ ff := r.ff
+ ff.rangeBigFilesLock.Lock()
+ r.end, r.sta, r.buf = r.end[:0], r.sta[:0], r.buf[:0]
+ r.cRange = 0
+ r.hf = true
+ r.he = false
+ r.hasCurRangeBodyHeader = false
+
+ ff.rangeBigFiles = append(ff.rangeBigFiles, r)
+ ff.rangeBigFilesLock.Unlock()
+ } else {
+ r.f.Close()
+ }
+ r.ff.decReadersCount()
+ return err
+}
+
+func (r *bigRangeReader) Read(p []byte) (int, error) {
+ if r.cRange >= len(r.sta) && len(r.buf) == 0 && r.he {
+ return 0, io.EOF
+ }
+
+ ff := r.ff
+ var err error
+ cPos, cLen, n := 0, 0, 0
+
+ if r.IsMultiRange() {
+ cLen = len(p[cPos:])
+ n = copy(p[cPos:], r.buf)
+ if len(r.buf) > cLen {
+ r.buf = r.buf[n:]
+ return n, nil
+ }
+ cPos += n
+ r.buf = r.buf[:0]
+ }
+
+ for i := r.cRange; i < len(r.sta); i++ {
+ if r.sta[i] >= r.end[i] {
+ continue
+ }
+
+ if r.IsMultiRange() && !r.hasCurRangeBodyHeader {
+ r.hasCurRangeBodyHeader = true
+ multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.hf)
+ r.hf = false
+ cLen = len(p[cPos:])
+ n = copy(p[cPos:], r.buf)
+ cPos += n
+ if len(r.buf) > cLen {
+ r.buf = r.buf[n:]
+ return cPos, nil
+ }
+ r.buf = r.buf[:0]
+ }
+
+ cLen = len(p[cPos:])
+ if r.end[i]-r.sta[i] > cLen {
+ n, err = r.f.ReadAt(p[cPos:], int64(r.sta[i]))
+ r.sta[i] += n
+ return cPos + n, err
+ }
+
+ // todo use pool
+ cBody := make([]byte, r.end[i]-r.sta[i])
+ n, err = ff.f.ReadAt(cBody, int64(r.sta[i]))
+ if err != nil && err != io.EOF {
+ return cPos + n, err
+ }
+ n = copy(p[cPos:], cBody)
+ cPos += n
+ r.cRange = i + 1
+ r.hasCurRangeBodyHeader = false
+ }
+
+ if r.IsMultiRange() && !r.he {
+ multiRangeBodyEnd(&r.buf, r.boundary)
+ r.he = true
+ n = copy(p[cPos:], r.buf)
+ if len(r.buf) > len(p[cPos:]) {
+ r.buf = r.buf[n:]
+ return cPos + n, nil
+ }
+ cPos += n
+ r.buf = r.buf[:0]
+ }
+ return cPos, io.EOF
+}
+
+func (r *bigRangeReader) WriteTo(w io.Writer) (int64, error) {
+ if rf, ok := w.(io.ReaderFrom); ok {
+ return rf.ReadFrom(r)
+ }
+
+ var err error
+ cPos, cLen, n, sum := 0, 0, 0, 0
+ hf := true
+
+ bufv := copyBufPool.Get()
+ buf := bufv.([]byte)
+
+ for i := range r.sta {
+
+ if r.IsMultiRange() {
+ buf = buf[:0]
+ if i > 0 {
+ hf = false
+ }
+ multiRangeBodyHeader(&buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, hf)
+ nw, errw := w.Write(buf)
+ if errw == nil && nw != len(buf) {
+ panic("BUG: buf returned(n, nil),where n != len(buf)")
+ }
+ sum += nw
+ if errw != nil {
+ return int64(sum), errw
+ }
+ }
+
+ cPos = r.sta[i]
+ buf = buf[:4096]
+ for err == nil {
+ cLen = r.end[i] - cPos
+ if cLen <= 0 {
+ break
+ }
+ if len(buf) > cLen {
+ buf = buf[:cLen]
+ }
+ n, err = r.f.ReadAt(buf, int64(cPos))
+ nw, errw := w.Write(buf[:n])
+ cPos += nw
+ sum += nw
+ if errw == nil && nw != n {
+ panic("BUG: Write(p) returned (n, nil), where n != len(p)")
+ }
+ if err == nil {
+ err = errw
+ }
+ }
+ }
+ if err == io.EOF {
+ err = nil
+ }
+ if r.IsMultiRange() {
+ buf = buf[:0]
+ multiRangeBodyEnd(&buf, r.boundary)
+ nw, errw := w.Write(buf)
+ if err == nil && nw != len(buf) {
+ panic("BUG: buf returned (n, nil), where n != len(buf)")
+ }
+ sum += nw
+ err = errw
+ }
+ return int64(sum), err
+}
+
+func multiRangeLength(sta, end []int, cl int, ct, bd string) int {
+ sum := 0
+ hf := true
+ bufv := copyBufPool.Get()
+ buf := bufv.([]byte)
+ for i := range sta {
+ buf = buf[:0]
+ if i > 0 {
+ hf = false
+ }
+ multiRangeBodyHeader(&buf, sta[i], end[i], cl, ct, bd, hf)
+ sum += len(buf)
+ sum += end[i] - sta[i]
+ }
+ buf = buf[:0]
+ multiRangeBodyEnd(&buf, bd)
+ sum += len(buf)
+ copyBufPool.Put(bufv)
+ return sum
+}
+
+func multiRangeBodyHeader(b *[]byte, sta, end, size int, ct, boundary string, hf bool) {
+ if !hf {
+ *b = append(*b, s2b("\r\n")...)
+ }
+ *b = append(*b, s2b(fmt.Sprintf("--%s\r\n", boundary))...)
+ *b = append(*b, s2b(fmt.Sprintf("%s: %s\r\n", HeaderContentRange,
+ fmt.Sprintf("bytes %d-%d/%d", sta, end-1, size)))...)
+ *b = append(*b, s2b(fmt.Sprintf("%s: %s\r\n", HeaderContentType, ct))...)
+ *b = append(*b, s2b("\r\n")...)
+}
+
+func multiRangeBodyEnd(b *[]byte, boundary string) {
+ *b = append(*b, s2b(fmt.Sprintf("\r\n--%s--\r\n", boundary))...)
+}
+
+func randomBoundary() string {
+ var buf [30]byte
+ _, err := io.ReadFull(rand.Reader, buf[:])
+ if err != nil {
+ panic(err)
+ }
+ return fmt.Sprintf("%x", buf[:])
+}