diff --git a/.gitignore b/.gitignore index 99e4088..3b8dbb7 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,7 @@ _authortools /_examples/**/node_modules .directory -*.exe \ No newline at end of file +*.exe +*.exe~ +.idea +/vendor \ No newline at end of file diff --git a/_examples/gobwasalive/main.go b/_examples/gobwasalive/main.go new file mode 100644 index 0000000..3bf5158 --- /dev/null +++ b/_examples/gobwasalive/main.go @@ -0,0 +1,126 @@ +package main + +import ( + "context" + "log" + "net/http" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + "github.com/kataras/neffos" + "github.com/kataras/neffos/gobwasalive" +) + +var events = neffos.WithTimeout{ + ReadTimeout: 60 * time.Second, + WriteTimeout: 60 * time.Second, + + Namespaces: neffos.Namespaces{ + "v1": neffos.Events{ + "echo": onEcho, + }, + }, +} + +func onEcho(c *neffos.NSConn, msg neffos.Message) error { + body := string(msg.Body) + log.Println(body) + + if !c.Conn.IsClient() { + newBody := append([]byte("echo back: "), msg.Body...) + return neffos.Reply(newBody) + } + + return nil +} + +func main() { + args := os.Args[1:] + if len(args) == 0 { + log.Fatalf("expected program to start with 'server' or 'client' argument") + } + side := args[0] + + clientNum := 0 + if len(args) == 2 { + clientNum, _ = strconv.Atoi(args[1]) + log.Printf("Start clientNum: %v", clientNum) + } + + switch side { + case "server": + runServer() + case "client": + if clientNum == 0 { + runClient() + } else { + runClientMany(clientNum) + } + + default: + log.Fatalf("unexpected argument, expected 'server' or 'client' but got '%s'", side) + } +} + +func runServer() { + upgrader := gobwasalive.NewUpgrader(50 * time.Second) + websocketServer := neffos.New(upgrader, events) + + websocketServer.OnConnect = func(c *neffos.Conn) error { + log.Printf("OnConnect cid: %v", c.ID()) + return nil + } + + websocketServer.OnDisconnect = func(c *neffos.Conn) { + log.Printf("OnDisconnect cid: %v", c.ID()) + } + + router := http.NewServeMux() + router.Handle("/echo", websocketServer) + + ticker := time.NewTicker(30 * time.Second) + go func() { + for { + select { + case <-ticker.C: + log.Printf("total clients: %v", websocketServer.GetTotalConnections()) + } + } + }() + + log.Println("Serving websockets on localhost:8080/echo") + log.Fatal(http.ListenAndServe(":8080", router)) +} + +func runClient() { + ctx := context.Background() + dialer := gobwasalive.NewDialer(45 * time.Second) + client, err := neffos.Dial(ctx, dialer, "ws://localhost:8080/echo", events) + if err != nil { + panic(err) + } + + _, err = client.Connect(ctx, "v1") + if err != nil { + panic(err) + } + + //c.Emit("echo", []byte("Greetings!")) + + // a channel that blocks until client is terminated, + // i.e by CTRL/CMD +C. + <-client.NotifyClose +} + +func runClientMany(clientNum int) { + for i := 0; i < clientNum; i++ { + time.Sleep(5 * time.Millisecond) + go runClient() + } + ch := make(chan os.Signal) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + log.Println(<-ch) +} diff --git a/go.mod b/go.mod index 77aeac6..1ecb1e4 100644 --- a/go.mod +++ b/go.mod @@ -2,12 +2,27 @@ module github.com/kataras/neffos go 1.12 +replace ( + golang.org/x/build => github.com/golang/build v0.0.0-20190709001953-30c0e6b89ea0 + golang.org/x/crypto => github.com/golang/crypto v0.0.0-20190701094942-4def268fd1a4 + golang.org/x/debug => github.com/golang/debug v0.0.0-20190515041333-621e2d3f35da + golang.org/x/lint => github.com/golang/lint v0.0.0-20190409202823-959b441ac422 + golang.org/x/net => github.com/golang/net v0.0.0-20190628185345-da137c7871d7 + golang.org/x/perf => github.com/golang/perf v0.0.0-20190620143337-7c3f2128ad9b + golang.org/x/sync => github.com/golang/sync v0.0.0-20190423024810-112230192c58 + golang.org/x/sys => github.com/golang/sys v0.0.0-20190710143415-6ec70d6a5542 + golang.org/x/text => github.com/golang/text v0.3.2 + golang.org/x/time => github.com/golang/time v0.0.0-20190308202827-9d24e82272b4 + golang.org/x/tools => github.com/golang/tools v0.0.0-20190711191110-9a621aea19f8 +) + require ( + github.com/RussellLuo/timingwheel v0.0.0-20190518031256-7b3d146a266a github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee // indirect github.com/gobwas/pool v0.2.0 // indirect github.com/gobwas/ws v1.0.1 github.com/gorilla/websocket v1.4.0 github.com/iris-contrib/go.uuid v2.0.0+incompatible github.com/mediocregopher/radix/v3 v3.3.0 - github.com/nats-io/nats.go v1.8.1 // indirect + github.com/nats-io/nats.go v1.8.1 ) diff --git a/go.sum b/go.sum index b5b6fff..fa21ab2 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,25 @@ +github.com/RussellLuo/timingwheel v0.0.0-20190518031256-7b3d146a266a h1:BBXTjl+/xTT72HpWqPe2oqKex5fZwsqHaxUEVqjT2Sg= +github.com/RussellLuo/timingwheel v0.0.0-20190518031256-7b3d146a266a/go.mod h1:3VIJp8oOAlnDUnPy3kwyBGqsMiJJujqTP6ic9Jv6NbM= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.0.1 h1:iYpM3WoNpsexO6bqCN1MnvVRylnKg6278zivIZDRXUM= github.com/gobwas/ws v1.0.1/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= +github.com/golang/crypto v0.0.0-20190701094942-4def268fd1a4 h1:SqpWDZAu6UkmbvUTCtyNpBZLY8110TJ7bgxIki3pZw0= +github.com/golang/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +github.com/golang/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +github.com/golang/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +github.com/golang/sys v0.0.0-20190710143415-6ec70d6a5542 h1:nj3zoh9xX6lavzCnatfuuBVNMi37YqbhEThbgW5ysWs= +github.com/golang/sys v0.0.0-20190710143415-6ec70d6a5542/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +github.com/golang/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +github.com/golang/tools v0.0.0-20190711191110-9a621aea19f8/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/iris-contrib/go.uuid v2.0.0+incompatible h1:XZubAYg61/JwnJNbZilGjf3b3pB80+OQg2qf6c8BfWE= github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0= +github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg= github.com/mediocregopher/radix/v3 v3.3.0 h1:oacPXPKHJg0hcngVVrdtTnfGJiS+PtwoQwTBZGFlV4k= github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ= github.com/nats-io/nats.go v1.8.1 h1:6lF/f1/NN6kzUDBz6pyvQDEXO39jqXcWRLu/tKjtOUQ= @@ -16,3 +28,5 @@ github.com/nats-io/nkeys v0.0.2 h1:+qM7QpgXnvDDixitZtQUBDY9w/s9mu1ghS+JIbsrx6M= github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/gobwasalive/dialer.go b/gobwasalive/dialer.go new file mode 100644 index 0000000..d9bad2b --- /dev/null +++ b/gobwasalive/dialer.go @@ -0,0 +1,39 @@ +package gobwasalive + +import ( + "context" + "github.com/RussellLuo/timingwheel" + "time" + + "github.com/kataras/neffos" + + gobwas "github.com/gobwas/ws" +) + +// DefaultDialer is a gobwas/ws dialer with all fields set to the default values. +//var DefaultDialer = Dialer(gobwas.DefaultDialer) + +var twDialer = timingwheel.NewTimingWheel(50*time.Millisecond, 100) +var twDialerRunning = false + +// Dialer is a `neffos.Dialer` type for the gobwas/ws subprotocol implementation. +// Should be used on `Dial` to create a new client/client-side connection. +// To send headers to the server set the dialer's `Header` field to a `gobwas.HandshakeHeaderHTTP`. +func dialer(dialer gobwas.Dialer, idleTime time.Duration) neffos.Dialer { + return func(ctx context.Context, url string) (neffos.Socket, error) { + underline, _, _, err := dialer.Dial(ctx, url) + if err != nil { + return nil, err + } + + return newSocket(underline, nil, true, idleTime, twDialer), nil + } +} + +func NewDialer(idleTime time.Duration) neffos.Dialer { + if !twDialerRunning { + twDialer.Start() + twDialerRunning = true + } + return dialer(gobwas.DefaultDialer, idleTime) +} diff --git a/gobwasalive/helpers_go19.go b/gobwasalive/helpers_go19.go new file mode 100644 index 0000000..4f9f384 --- /dev/null +++ b/gobwasalive/helpers_go19.go @@ -0,0 +1,12 @@ +// +build go1.9 + +package gobwasalive + +import gobwas "github.com/gobwas/ws" + +// Options is just an alias for the `gobwas/ws.Dialer` struct type. +type Options = gobwas.Dialer + +// Header is an alias to the adapter that allows the use of `http.Header` as +// `gobwas/ws.Dialer.HandshakeHeader`. +type Header = gobwas.HandshakeHeaderHTTP diff --git a/gobwasalive/socket.go b/gobwasalive/socket.go new file mode 100644 index 0000000..e51b097 --- /dev/null +++ b/gobwasalive/socket.go @@ -0,0 +1,191 @@ +package gobwasalive + +import ( + "io" + "io/ioutil" + "log" + "net" + "net/http" + "sync" + "time" + + "github.com/RussellLuo/timingwheel" + gobwas "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" +) + +// Socket completes the `neffos.Socket` interface, +// it describes the underline websocket connection. +type Socket struct { + UnderlyingConn net.Conn + request *http.Request + + reader *wsutil.Reader + controlHandler wsutil.FrameHandlerFunc + state gobwas.State + + mu sync.Mutex + + //after idleTime not received any MESSAGE/PING/PONG + //will send a PING to remote. keepalive + //set zero will disable this feature + idleTime time.Duration + + //global timingwheel for better performance of timer + tw *timingwheel.TimingWheel +} + +func newSocket(underline net.Conn, request *http.Request, client bool, + idleTime time.Duration, tw *timingwheel.TimingWheel) *Socket { + state := gobwas.StateServerSide + if client { + state = gobwas.StateClientSide + } + + controlHandler := wsutil.ControlFrameHandler(underline, state) + + reader := &wsutil.Reader{ + Source: underline, + State: state, + CheckUTF8: true, + SkipHeaderCheck: false, + // "intermediate" frames, that possibly could + // be received between text/binary continuation frames. + // Read `gobwas/wsutil/reader#NextReader`. + // + OnIntermediate: controlHandler, + } + + return &Socket{ + UnderlyingConn: underline, + request: request, + state: state, + reader: reader, + controlHandler: controlHandler, + idleTime: idleTime, + tw: tw, + } +} + +// NetConn returns the underline net connection. +func (s *Socket) NetConn() net.Conn { + return s.UnderlyingConn +} + +// Request returns the http request value. +func (s *Socket) Request() *http.Request { + return s.request +} + +const MinPingTime = 10 * time.Second + +func (s *Socket) SendPing() { + if err := s.write([]byte(""), gobwas.OpPing, MinPingTime); err != nil { + log.Printf("gobwasalive Socket Send keepalive websocket PING, err: %v", err) + } +} + +// ReadData reads binary or text messages from the remote connection. +func (s *Socket) ReadData(timeout time.Duration) ([]byte, error) { + delayTime := s.idleTime + var delayTimer *timingwheel.Timer + for { + if timeout > 0 { + if err := s.UnderlyingConn.SetReadDeadline(time.Now().Add(timeout)); err != nil { + log.Printf("gobwasalive Socket SetReadDeadline, err: %v", err) + } + if timeout-MinPingTime < s.idleTime { + delayTime = MinPingTime + } + } + + if delayTime > 0 { + delayTimer = s.tw.AfterFunc(delayTime, s.SendPing) + } + + hdr, err := s.reader.NextFrame() + + if delayTimer != nil { + delayTimer.Stop() + delayTimer = nil + } + + if err != nil { + if err == io.EOF { + return nil, io.ErrUnexpectedEOF // for io.ReadAll to return an error if connection remotely closed. + } + return nil, err + } + + if hdr.OpCode == gobwas.OpClose { + return nil, io.ErrUnexpectedEOF // for io.ReadAll to return an error if connection remotely closed. + } + + if hdr.OpCode.IsControl() { + err = s.controlHandler(hdr, s.reader) + if err != nil { + return nil, err + } + continue + } + + if hdr.OpCode&gobwas.OpBinary == 0 && hdr.OpCode&gobwas.OpText == 0 { + err = s.reader.Discard() + if err != nil { + return nil, err + } + continue + } + + return ioutil.ReadAll(s.reader) + } + + // for { + // if timeout > 0 { + // s.UnderlyingConn.SetReadDeadline(time.Now().Add(timeout)) + // } + + // b, code, err := wsutil.ReadData(s.UnderlyingConn, s.state) + // if err != nil { + // return nil, err + // } + + // if code != defaultOp { + // continue + // } + + // return b, nil + // } +} + +// WriteBinary sends a binary message to the remote connection. +func (s *Socket) WriteBinary(body []byte, timeout time.Duration) error { + return s.write(body, gobwas.OpBinary, timeout) +} + +// WriteText sends a text message to the remote connection. +func (s *Socket) WriteText(body []byte, timeout time.Duration) error { + return s.write(body, gobwas.OpText, timeout) +} + +func (s *Socket) write(body []byte, op gobwas.OpCode, timeout time.Duration) error { + s.mu.Lock() + if timeout > 0 { + if err := s.UnderlyingConn.SetWriteDeadline(time.Now().Add(timeout)); err != nil { + log.Printf("gobwasalive Socket SetWriteDeadline, err: %v", err) + } + } + + // println("write: " + string(body)) + err := wsutil.WriteMessage(s.UnderlyingConn, s.state, op, body) + + if timeout > 0 { //must set it back to zero, otherwise it will get EOF error + if err := s.UnderlyingConn.SetWriteDeadline(time.Time{}); err != nil { + log.Printf("gobwasalive Socket SetWriteDeadline back, err: %v", err) + } + } + + s.mu.Unlock() + + return err +} diff --git a/gobwasalive/upgrader.go b/gobwasalive/upgrader.go new file mode 100644 index 0000000..76a756e --- /dev/null +++ b/gobwasalive/upgrader.go @@ -0,0 +1,38 @@ +package gobwasalive + +import ( + "net/http" + "time" + + "github.com/kataras/neffos" + + "github.com/RussellLuo/timingwheel" + gobwas "github.com/gobwas/ws" +) + +// DefaultUpgrader is a gobwas/ws HTTP Upgrader with all fields set to the default values. +//var DefaultUpgrader = Upgrader(gobwas.HTTPUpgrader{}) + +var twServer = timingwheel.NewTimingWheel(50*time.Millisecond, 100) +var twServerRunning = false + +// Upgrader is a `neffos.Upgrader` type for the gobwas/ws subprotocol implementation. +// Should be used on `neffos.New` to construct the neffos server. +func upgrader(upgrader gobwas.HTTPUpgrader, idleTime time.Duration) neffos.Upgrader { + return func(w http.ResponseWriter, r *http.Request) (neffos.Socket, error) { + underline, _, _, err := upgrader.Upgrade(r, w) + if err != nil { + return nil, err + } + + return newSocket(underline, r, false, idleTime, twServer), nil + } +} + +func NewUpgrader(idleTime time.Duration) neffos.Upgrader { + if !twServerRunning { + twServer.Start() + twServerRunning = true + } + return upgrader(gobwas.HTTPUpgrader{}, idleTime) +}