aboutsummaryrefslogtreecommitdiff
path: root/workerpool_test.go
diff options
context:
space:
mode:
authorGravatar Aliaksandr Valialkin <valyala@gmail.com> 2016-03-31 14:41:27 +0300
committerGravatar Aliaksandr Valialkin <valyala@gmail.com> 2016-03-31 14:41:27 +0300
commit9f20a62336ee632d92f75b7433a57ae901308299 (patch)
tree978c36db227c35d206433aeca0fec1ae6d9e9770 /workerpool_test.go
parentworkerpool: bugfix: do not return workerChan to ready pool on panic (diff)
downloadfasthttp-9f20a62336ee632d92f75b7433a57ae901308299.tar.gz
fasthttp-9f20a62336ee632d92f75b7433a57ae901308299.tar.bz2
fasthttp-9f20a62336ee632d92f75b7433a57ae901308299.zip
added workerpool tests
Diffstat (limited to 'workerpool_test.go')
-rw-r--r--workerpool_test.go266
1 files changed, 266 insertions, 0 deletions
diff --git a/workerpool_test.go b/workerpool_test.go
new file mode 100644
index 0000000..dbe230c
--- /dev/null
+++ b/workerpool_test.go
@@ -0,0 +1,266 @@
+package fasthttp
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/valyala/fasthttp/fasthttputil"
+)
+
+func TestWorkerPoolStartStopSerial(t *testing.T) {
+ testWorkerPoolStartStop(t)
+}
+
+func TestWorkerPoolStartStopConcurrent(t *testing.T) {
+ concurrency := 10
+ ch := make(chan struct{}, concurrency)
+ for i := 0; i < concurrency; i++ {
+ go func() {
+ testWorkerPoolStartStop(t)
+ ch <- struct{}{}
+ }()
+ }
+ for i := 0; i < concurrency; i++ {
+ select {
+ case <-ch:
+ case <-time.After(time.Second):
+ t.Fatalf("timeout")
+ }
+ }
+}
+
+func testWorkerPoolStartStop(t *testing.T) {
+ wp := &workerPool{
+ WorkerFunc: func(conn net.Conn) error { return nil },
+ MaxWorkersCount: 10,
+ Logger: defaultLogger,
+ }
+ for i := 0; i < 10; i++ {
+ wp.Start()
+ wp.Stop()
+ }
+}
+
+func TestWorkerPoolMaxWorkersCountSerial(t *testing.T) {
+ testWorkerPoolMaxWorkersCountMulti(t)
+}
+
+func TestWorkerPoolMaxWorkersCountConcurrent(t *testing.T) {
+ concurrency := 10
+ ch := make(chan struct{}, concurrency)
+ for i := 0; i < concurrency; i++ {
+ go func() {
+ testWorkerPoolMaxWorkersCountMulti(t)
+ ch <- struct{}{}
+ }()
+ }
+ for i := 0; i < concurrency; i++ {
+ select {
+ case <-ch:
+ case <-time.After(time.Second):
+ t.Fatalf("timeout")
+ }
+ }
+}
+
+func testWorkerPoolMaxWorkersCountMulti(t *testing.T) {
+ for i := 0; i < 10; i++ {
+ testWorkerPoolMaxWorkersCount(t)
+ }
+}
+
+func testWorkerPoolMaxWorkersCount(t *testing.T) {
+ ready := make(chan struct{})
+ wp := &workerPool{
+ WorkerFunc: func(conn net.Conn) error {
+ buf := make([]byte, 100)
+ n, err := conn.Read(buf)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ buf = buf[:n]
+ if string(buf) != "foobar" {
+ t.Fatalf("unexpected data read: %q. Expecting %q", buf, "foobar")
+ }
+ if _, err = conn.Write([]byte("baz")); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+
+ <-ready
+
+ return nil
+ },
+ MaxWorkersCount: 10,
+ Logger: defaultLogger,
+ }
+ wp.Start()
+
+ ln := fasthttputil.NewInmemoryListener()
+
+ clientCh := make(chan struct{}, wp.MaxWorkersCount)
+ for i := 0; i < wp.MaxWorkersCount; i++ {
+ go func() {
+ conn, err := ln.Dial()
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if _, err = conn.Write([]byte("foobar")); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ data, err := ioutil.ReadAll(conn)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if string(data) != "baz" {
+ t.Fatalf("unexpected value read: %q. Expecting %q", data, "baz")
+ }
+ if err = conn.Close(); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ clientCh <- struct{}{}
+ }()
+ }
+
+ for i := 0; i < wp.MaxWorkersCount; i++ {
+ conn, err := ln.Accept()
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if !wp.Serve(conn) {
+ t.Fatalf("worker pool must have enough workers to serve the conn")
+ }
+ }
+
+ go func() {
+ if _, err := ln.Dial(); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ }()
+ conn, err := ln.Accept()
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ for i := 0; i < 5; i++ {
+ if wp.Serve(conn) {
+ t.Fatalf("worker pool must be full")
+ }
+ }
+ if err = conn.Close(); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+
+ close(ready)
+
+ for i := 0; i < wp.MaxWorkersCount; i++ {
+ select {
+ case <-clientCh:
+ case <-time.After(time.Second):
+ t.Fatalf("timeout")
+ }
+ }
+
+ if err := ln.Close(); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ wp.Stop()
+}
+
+func TestWorkerPoolPanicErrorSerial(t *testing.T) {
+ testWorkerPoolPanicErrorMulti(t)
+}
+
+func TestWorkerPoolPanicErrorConcurrent(t *testing.T) {
+ concurrency := 10
+ ch := make(chan struct{}, concurrency)
+ for i := 0; i < concurrency; i++ {
+ go func() {
+ testWorkerPoolPanicErrorMulti(t)
+ ch <- struct{}{}
+ }()
+ }
+ for i := 0; i < concurrency; i++ {
+ select {
+ case <-ch:
+ case <-time.After(time.Second):
+ t.Fatalf("timeout")
+ }
+ }
+}
+
+func testWorkerPoolPanicErrorMulti(t *testing.T) {
+ var globalCount uint64
+ wp := &workerPool{
+ WorkerFunc: func(conn net.Conn) error {
+ count := atomic.AddUint64(&globalCount, 1)
+ switch count % 3 {
+ case 0:
+ panic("foobar")
+ case 1:
+ return fmt.Errorf("fake error")
+ }
+ return nil
+ },
+ MaxWorkersCount: 1000,
+ Logger: &customLogger{},
+ }
+
+ for i := 0; i < 10; i++ {
+ testWorkerPoolPanicError(t, wp)
+ }
+}
+
+func testWorkerPoolPanicError(t *testing.T, wp *workerPool) {
+ wp.Start()
+
+ ln := fasthttputil.NewInmemoryListener()
+
+ clientsCount := 10
+ clientCh := make(chan struct{}, clientsCount)
+ for i := 0; i < clientsCount; i++ {
+ go func() {
+ conn, err := ln.Dial()
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ data, err := ioutil.ReadAll(conn)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if len(data) > 0 {
+ t.Fatalf("unexpected data read: %q. Expecting empty data", data)
+ }
+ if err = conn.Close(); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ clientCh <- struct{}{}
+ }()
+ }
+
+ for i := 0; i < clientsCount; i++ {
+ conn, err := ln.Accept()
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if !wp.Serve(conn) {
+ t.Fatalf("worker pool mustn't be full")
+ }
+ }
+
+ for i := 0; i < clientsCount; i++ {
+ select {
+ case <-clientCh:
+ case <-time.After(time.Second):
+ t.Fatalf("timeout")
+ }
+ }
+
+ if err := ln.Close(); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+
+ wp.Stop()
+}