aboutsummaryrefslogtreecommitdiff
path: root/fasthttputil
diff options
context:
space:
mode:
authorGravatar Aliaksandr Valialkin <valyala@gmail.com> 2016-02-04 16:29:26 +0200
committerGravatar Aliaksandr Valialkin <valyala@gmail.com> 2016-02-04 16:29:26 +0200
commita9dcd0efd01055ac6381a5b9b2dc6fac08a09686 (patch)
tree05e177b86a84645e09fa936de3deaaaa302ef49a /fasthttputil
parentfasthttputil: export PipeConns (diff)
downloadfasthttp-a9dcd0efd01055ac6381a5b9b2dc6fac08a09686.tar.gz
fasthttp-a9dcd0efd01055ac6381a5b9b2dc6fac08a09686.tar.bz2
fasthttp-a9dcd0efd01055ac6381a5b9b2dc6fac08a09686.zip
fasthttputil: added pipe tests
Diffstat (limited to 'fasthttputil')
-rw-r--r--fasthttputil/pipe.go9
-rw-r--r--fasthttputil/pipe_test.go146
2 files changed, 152 insertions, 3 deletions
diff --git a/fasthttputil/pipe.go b/fasthttputil/pipe.go
index 1e91600..6d01529 100644
--- a/fasthttputil/pipe.go
+++ b/fasthttputil/pipe.go
@@ -159,10 +159,13 @@ var errWouldBlock = errors.New("would block")
func (c *pipeConn) Close() error {
c.wlock.Lock()
- if !c.wclosed {
- c.wclosed = true
- c.w.ch <- nil
+ if c.wclosed {
+ c.wlock.Unlock()
+ return errors.New("connection already closed")
}
+
+ c.wclosed = true
+ c.w.ch <- nil
c.wlock.Unlock()
c.pc.release()
diff --git a/fasthttputil/pipe_test.go b/fasthttputil/pipe_test.go
new file mode 100644
index 0000000..b4bab0e
--- /dev/null
+++ b/fasthttputil/pipe_test.go
@@ -0,0 +1,146 @@
+package fasthttputil
+
+import (
+ "fmt"
+ "io"
+ "net"
+ "testing"
+ "time"
+)
+
+func TestPipeConnsReadWriteSerial(t *testing.T) {
+ testPipeConnsReadWriteSerial(t)
+}
+
+func TestPipeConnsReadWriteConcurrent(t *testing.T) {
+ testConcurrency(t, 10, testPipeConnsReadWriteSerial)
+}
+
+func testPipeConnsReadWriteSerial(t *testing.T) {
+ pc := NewPipeConns()
+ testPipeConnsReadWrite(t, pc.Conn1(), pc.Conn2())
+
+ pc = NewPipeConns()
+ testPipeConnsReadWrite(t, pc.Conn2(), pc.Conn1())
+}
+
+func testPipeConnsReadWrite(t *testing.T, c1, c2 net.Conn) {
+ defer c1.Close()
+ defer c2.Close()
+
+ var buf [32]byte
+ for i := 0; i < 10; i++ {
+ // The first write
+ s1 := fmt.Sprintf("foo_%d", i)
+ n, err := c1.Write([]byte(s1))
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if n != len(s1) {
+ t.Fatalf("unexpected number of bytes written: %d. Expecting %d", n, len(s1))
+ }
+
+ // The second write
+ s2 := fmt.Sprintf("bar_%d", i)
+ n, err = c1.Write([]byte(s2))
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if n != len(s2) {
+ t.Fatalf("unexpected number of bytes written: %d. Expecting %d", n, len(s2))
+ }
+
+ // Read data written above in two writes
+ s := s1 + s2
+ n, err = c2.Read(buf[:])
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if n != len(s) {
+ t.Fatalf("unexpected number of bytes read: %d. Expecting %d", n, len(s))
+ }
+ if string(buf[:n]) != s {
+ t.Fatalf("unexpected string read: %q. Expecting %q", buf[:n], s)
+ }
+ }
+}
+
+func TestPipeConnsCloseSerial(t *testing.T) {
+ testPipeConnsCloseSerial(t)
+}
+
+func TestPipeConnsCloseConcurrent(t *testing.T) {
+ testConcurrency(t, 10, testPipeConnsCloseSerial)
+}
+
+func testPipeConnsCloseSerial(t *testing.T) {
+ pc := NewPipeConns()
+ testPipeConnsClose(t, pc.Conn1(), pc.Conn2())
+
+ pc = NewPipeConns()
+ testPipeConnsClose(t, pc.Conn2(), pc.Conn1())
+}
+
+func testPipeConnsClose(t *testing.T, c1, c2 net.Conn) {
+ if err := c1.Close(); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ var buf [10]byte
+
+ // attempt writing to closed conn
+ for i := 0; i < 10; i++ {
+ n, err := c1.Write(buf[:])
+ if err == nil {
+ t.Fatalf("expecting error")
+ }
+ if n != 0 {
+ t.Fatalf("unexpected number of bytes written: %d. Expecting 0", n)
+ }
+ }
+
+ // attempt reading from closed conn
+ for i := 0; i < 10; i++ {
+ n, err := c2.Read(buf[:])
+ if err == nil {
+ t.Fatalf("expecting error")
+ }
+ if err != io.EOF {
+ t.Fatalf("unexpected error: %s. Expecting %s", err, io.EOF)
+ }
+ if n != 0 {
+ t.Fatalf("unexpected number of bytes read: %d. Expecting 0", n)
+ }
+ }
+
+ if err := c2.Close(); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+
+ // attempt closing already closed conns
+ for i := 0; i < 10; i++ {
+ if err := c1.Close(); err == nil {
+ t.Fatalf("expecting error")
+ }
+ if err := c2.Close(); err == nil {
+ t.Fatalf("expecting error")
+ }
+ }
+}
+
+func testConcurrency(t *testing.T, concurrency int, f func(*testing.T)) {
+ ch := make(chan struct{}, concurrency)
+ for i := 0; i < concurrency; i++ {
+ go func() {
+ f(t)
+ ch <- struct{}{}
+ }()
+ }
+
+ for i := 0; i < concurrency; i++ {
+ select {
+ case <-ch:
+ case <-time.After(time.Second):
+ t.Fatalf("timeout")
+ }
+ }
+}