diff options
author | Aliaksandr Valialkin <valyala@gmail.com> | 2015-11-20 14:46:12 +0200 |
---|---|---|
committer | Aliaksandr Valialkin <valyala@gmail.com> | 2015-11-20 14:46:12 +0200 |
commit | db29399a7d67a7454a0553ea9fd94c33fbf5988c (patch) | |
tree | 333a102698cfc2ddf55745c82590d38ce8b2a641 /reuseport | |
parent | Do not allocate memory on BodyWriter() call (diff) | |
download | fasthttp-db29399a7d67a7454a0553ea9fd94c33fbf5988c.tar.gz fasthttp-db29399a7d67a7454a0553ea9fd94c33fbf5988c.tar.bz2 fasthttp-db29399a7d67a7454a0553ea9fd94c33fbf5988c.zip |
Added net.Listener with reuseport support
Diffstat (limited to 'reuseport')
-rw-r--r-- | reuseport/LICENSE | 21 | ||||
-rw-r--r-- | reuseport/reuseport.go | 91 | ||||
-rw-r--r-- | reuseport/reuseport_bsd.go | 7 | ||||
-rw-r--r-- | reuseport/reuseport_linux.go | 3 | ||||
-rw-r--r-- | reuseport/reuseport_test.go | 98 |
5 files changed, 220 insertions, 0 deletions
diff --git a/reuseport/LICENSE b/reuseport/LICENSE new file mode 100644 index 0000000..5f25159 --- /dev/null +++ b/reuseport/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Max Riveiro + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.
\ No newline at end of file diff --git a/reuseport/reuseport.go b/reuseport/reuseport.go new file mode 100644 index 0000000..dba030c --- /dev/null +++ b/reuseport/reuseport.go @@ -0,0 +1,91 @@ +// Package provides TCP net.Listener with SO_REUSEPORT support. +// +// SO_REUSEPORT allows linear scaling server performance on multi-CPU servers. +// See https://www.nginx.com/blog/socket-sharding-nginx-release-1-9-1/ for more details :) +// +// Package reuseport is based on https://github.com/kavu/go_reuseport . +package reuseport + +import ( + "errors" + "fmt" + "net" + "os" + "syscall" +) + +func getSockaddr(network, addr string) (sa syscall.Sockaddr, soType int, err error) { + // TODO: add support for tcp and tcp6 networks. + + if network != "tcp4" { + return nil, -1, errors.New("only tcp4 network is supported") + } + + tcpAddr, err := net.ResolveTCPAddr(network, addr) + if err != nil { + return nil, -1, err + } + + var sa4 syscall.SockaddrInet4 + sa4.Port = tcpAddr.Port + copy(sa4.Addr[:], tcpAddr.IP.To4()) + return &sa4, syscall.AF_INET, nil +} + +// ErrNoReusePort is returned if the OS doesn't support SO_REUSEPORT. +type ErrNoReusePort struct { + err error +} + +func (e *ErrNoReusePort) Error() string { + return fmt.Sprintf("The OS doesn't support SO_REUSEPORT: %s", e.err) +} + +// NewListener returns TCP listener with SO_REUSEPORT option set. +// +// Only tcp4 network is supported. +// +// ErrNoReusePort error is returned if the system doesn't support SO_REUSEPORT. +func NewListener(network, addr string) (l net.Listener, err error) { + var ( + soType, fd int + file *os.File + sockaddr syscall.Sockaddr + ) + + if sockaddr, soType, err = getSockaddr(network, addr); err != nil { + return nil, err + } + + if fd, err = syscall.Socket(soType, syscall.SOCK_STREAM, syscall.IPPROTO_TCP); err != nil { + return nil, err + } + + if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, reusePort, 1); err != nil { + syscall.Close(fd) + return nil, &ErrNoReusePort{err} + } + + if err = syscall.Bind(fd, sockaddr); err != nil { + syscall.Close(fd) + return nil, err + } + + if err = syscall.Listen(fd, syscall.SOMAXCONN); err != nil { + syscall.Close(fd) + return nil, err + } + + name := fmt.Sprintf("reuseport.%d.%s.%s", os.Getpid(), network, addr) + file = os.NewFile(uintptr(fd), name) + if l, err = net.FileListener(file); err != nil { + file.Close() + return nil, err + } + + if err = file.Close(); err != nil { + return nil, err + } + + return l, err +} diff --git a/reuseport/reuseport_bsd.go b/reuseport/reuseport_bsd.go new file mode 100644 index 0000000..89b44cb --- /dev/null +++ b/reuseport/reuseport_bsd.go @@ -0,0 +1,7 @@ +// +build darwin dragonfly freebsd netbsd openbsd + +package reuseport + +import "syscall" + +var reusePort = syscall.SO_REUSEPORT diff --git a/reuseport/reuseport_linux.go b/reuseport/reuseport_linux.go new file mode 100644 index 0000000..483aba1 --- /dev/null +++ b/reuseport/reuseport_linux.go @@ -0,0 +1,3 @@ +package reuseport + +var reusePort = 0x0F diff --git a/reuseport/reuseport_test.go b/reuseport/reuseport_test.go new file mode 100644 index 0000000..c1879f1 --- /dev/null +++ b/reuseport/reuseport_test.go @@ -0,0 +1,98 @@ +package reuseport + +import ( + "fmt" + "io/ioutil" + "net" + "testing" + "time" +) + +func TestNewListener(t *testing.T) { + addr := "localhost:10081" + serversCount := 20 + requestsCount := 1000 + + var lns []net.Listener + doneCh := make(chan struct{}, serversCount) + + for i := 0; i < serversCount; i++ { + ln, err := NewListener("tcp4", addr) + if err != nil { + t.Fatalf("cannot create listener %d: %s", i, err) + } + go func() { + serveEcho(t, ln) + doneCh <- struct{}{} + }() + lns = append(lns, ln) + } + + for i := 0; i < requestsCount; i++ { + c, err := net.Dial("tcp4", addr) + if err != nil { + t.Fatalf("%d. unexpected error when dialing: %s", i, err) + } + req := fmt.Sprintf("request number %d", i) + if _, err = c.Write([]byte(req)); err != nil { + t.Fatalf("%d. unexpected error when writing request: %s", i, err) + } + if err = c.(*net.TCPConn).CloseWrite(); err != nil { + t.Fatalf("%d. unexpected error when closing write end of the connection: %s", i, err) + } + + var resp []byte + ch := make(chan struct{}) + go func() { + if resp, err = ioutil.ReadAll(c); err != nil { + t.Fatalf("%d. unexpected error when reading response: %s", i, err) + } + close(ch) + }() + select { + case <-ch: + case <-time.After(200 * time.Millisecond): + t.Fatalf("%d. timeout when waiting for response: %s", i, err) + } + + if string(resp) != req { + t.Fatalf("%d. unexpected response %q. Expecting %q", i, resp, req) + } + if err = c.Close(); err != nil { + t.Fatalf("%d. unexpected error when closing connection: %s", i, err) + } + } + + for _, ln := range lns { + if err := ln.Close(); err != nil { + t.Fatalf("unexpected error when closing listener: %s", err) + } + } + + for i := 0; i < serversCount; i++ { + select { + case <-doneCh: + case <-time.After(200 * time.Millisecond): + t.Fatalf("timeout when waiting for servers to be closed") + } + } +} + +func serveEcho(t *testing.T, ln net.Listener) { + for { + c, err := ln.Accept() + if err != nil { + break + } + req, err := ioutil.ReadAll(c) + if err != nil { + t.Fatalf("unepxected error when reading request: %s", err) + } + if _, err = c.Write(req); err != nil { + t.Fatalf("unexpected error when writing response: %s", err) + } + if err = c.Close(); err != nil { + t.Fatalf("unexpected error when closing connection: %s", err) + } + } +} |