Skip to content
This repository was archived by the owner on Feb 7, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/logica0419/websocket-workshop
go 1.17

require (
github.com/gofrs/uuid v4.2.0+incompatible
github.com/gorilla/websocket v1.4.2
github.com/labstack/echo/v4 v4.6.3
github.com/labstack/gommon v0.3.1
)
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZgBrnJfGa0=
github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/labstack/echo/v4 v4.6.3 h1:VhPuIZYxsbPmo4m9KAkMU/el2442eB7EBFFhNTTT9ac=
github.com/labstack/echo/v4 v4.6.3/go.mod h1:Hk5OiHj0kDqmFq7aHe7eDqI7CUhuCrfpupQtLGGLm7A=
github.com/labstack/gommon v0.3.1 h1:OomWaJXm7xR6L1HmEtGyQf26TEn7V6X88mktX9kee9o=
Expand Down
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/labstack/gommon/log"
"github.com/logica0419/websocket-workshop/streamer"
)

func main() {
s := streamer.NewStreamer()

e := echo.New()

e.Logger.SetLevel(log.DEBUG)
Expand All @@ -22,7 +25,10 @@ func main() {
api.GET("/ping", func(c echo.Context) error {
return c.String(http.StatusOK, "pong")
})
api.GET("/ws", s.ConnectWS)
}

go s.Listen()

e.Logger.Panic(e.Start(":8080"))
}
59 changes: 59 additions & 0 deletions streamer/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package streamer

import (
"github.com/gofrs/uuid"
"github.com/gorilla/websocket"
)

type client struct {
id uuid.UUID
roomID string
conn *websocket.Conn
receiver chan receiveData
sender chan string
closer chan bool
}

func newClient(roomID string, conn *websocket.Conn, listener chan receiveData) *client {
return &client{
id: uuid.Must(uuid.NewV4()),
roomID: roomID,
conn: conn,
receiver: listener,
sender: make(chan string),
closer: make(chan bool),
}
}

func (c *client) listen() {
for {
msgType, msg, err := c.conn.ReadMessage()
if err != nil {
c.closer <- true
return
}
if msgType != websocket.TextMessage {
continue
}

data := receiveData{
clientID: c.id,
roomID: c.roomID,
payload: msg,
}

c.receiver <- data
}
}

func (c *client) send() {
for {
msg := <-c.sender

err := c.conn.WriteMessage(websocket.TextMessage, []byte(msg))
if err != nil {
c.closer <- true
return
}
}
}
35 changes: 35 additions & 0 deletions streamer/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package streamer

import (
"net/http"

"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
)

var upgrader = websocket.Upgrader{}

func (s *Streamer) ConnectWS(c echo.Context) error {
roomID := c.QueryParam("room")
if roomID == "" {
return echo.NewHTTPError(http.StatusBadRequest, "roomID is required")
}

conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err)
}

client := newClient(roomID, conn, s.receiver)

s.clients[client.id] = client

go client.listen()
go client.send()

<-client.closer

delete(s.clients, client.id)

return c.NoContent(http.StatusOK)
}
42 changes: 42 additions & 0 deletions streamer/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package streamer

import (
"encoding/json"
"fmt"
"log"
"time"
)

type payload struct {
Method string `json:"method,omitempty"`
Args json.RawMessage `json:"args,omitempty"`
}

type messageArgs struct {
Message string `json:"message"`
}

func (s *Streamer) handleWebSocket(data receiveData) error {
var req payload
err := json.Unmarshal(data.payload, &req)
if err != nil {
return err
}

switch req.Method {
case "message":
var args messageArgs
err = json.Unmarshal(req.Args, &args)
if err != nil {
return err
}
s.sendToRoom(data.roomID, args.Message)
case "time":
s.sendToRoom(data.roomID, time.Now().Format("01/02 15:04:05"))
default:
log.Printf("unknown method: %s", req.Method)
return fmt.Errorf("unknown method: %s", req.Method)
}

return nil
}
46 changes: 46 additions & 0 deletions streamer/streamer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package streamer

import (
"log"

"github.com/gofrs/uuid"
)

type receiveData struct {
clientID uuid.UUID
roomID string
payload []byte
}

type Streamer struct {
clients map[uuid.UUID]*client
receiver chan receiveData
}

func NewStreamer() *Streamer {
return &Streamer{
clients: make(map[uuid.UUID]*client),
receiver: make(chan receiveData),
}
}

func (s *Streamer) Listen() {
for {
data := <-s.receiver

go func() {
err := s.handleWebSocket(data)
if err != nil {
log.Printf("failed to handle websocket: %v", err)
}
}()
}
}

func (s *Streamer) sendToRoom(roomID, msg string) {
for _, c := range s.clients {
if c.roomID == roomID {
c.sender <- msg
}
}
}