Skip to content

Commit ac25fae

Browse files
authored
enhance websocket stats (#176)
1 parent 33d5b8a commit ac25fae

File tree

3 files changed

+196
-59
lines changed

3 files changed

+196
-59
lines changed

subscription.go

Lines changed: 2 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1530,7 +1530,7 @@ func (wh *WebsocketHandler) Ping() error {
15301530

15311531
// Close implements the function to close the websocket connection.
15321532
func (wh *WebsocketHandler) Close() error {
1533-
globalWebSocketStats.AddClosedConnection(wh.id)
1533+
defaultWebSocketStats.AddClosedConnection(wh.id)
15341534

15351535
return wh.Conn.Close(websocket.StatusNormalClosure, "close websocket")
15361536
}
@@ -1577,7 +1577,7 @@ func newWebsocketConn(
15771577
}
15781578

15791579
id := uuid.New()
1580-
globalWebSocketStats.AddActiveConnection(id)
1580+
defaultWebSocketStats.AddActiveConnection(id)
15811581

15821582
return &WebsocketHandler{
15831583
Conn: c,
@@ -1624,60 +1624,3 @@ type WebsocketOptions struct {
16241624
// The graphql server depends on the Sec-WebSocket-Protocol header to return the correct message specification
16251625
Subprotocols []string
16261626
}
1627-
1628-
// WebSocketStats hold statistic data of WebSocket connections for subscription.
1629-
type WebSocketStats struct {
1630-
TotalActiveConnections int
1631-
TotalClosedConnections int
1632-
ActiveConnectionIDs []uuid.UUID
1633-
}
1634-
1635-
type websocketStats struct {
1636-
sync sync.Mutex
1637-
activeConnectionIDs map[uuid.UUID]bool
1638-
closedConnectionIDs map[uuid.UUID]bool
1639-
}
1640-
1641-
var globalWebSocketStats = websocketStats{
1642-
activeConnectionIDs: map[uuid.UUID]bool{},
1643-
closedConnectionIDs: map[uuid.UUID]bool{},
1644-
}
1645-
1646-
// AddActiveConnection adds an active connection id to the list.
1647-
func (ws *websocketStats) AddActiveConnection(id uuid.UUID) {
1648-
ws.sync.Lock()
1649-
defer ws.sync.Unlock()
1650-
1651-
ws.activeConnectionIDs[id] = true
1652-
}
1653-
1654-
// AddClosedConnection adds an dead connection id to the list.
1655-
func (ws *websocketStats) AddClosedConnection(id uuid.UUID) {
1656-
ws.sync.Lock()
1657-
defer ws.sync.Unlock()
1658-
delete(ws.activeConnectionIDs, id)
1659-
1660-
ws.closedConnectionIDs[id] = true
1661-
}
1662-
1663-
// GetStats gets the websocket stats.
1664-
func (ws *websocketStats) GetStats() WebSocketStats {
1665-
ws.sync.Lock()
1666-
defer ws.sync.Unlock()
1667-
1668-
activeIDs := make([]uuid.UUID, 0, len(ws.activeConnectionIDs))
1669-
for id := range ws.activeConnectionIDs {
1670-
activeIDs = append(activeIDs, id)
1671-
}
1672-
1673-
return WebSocketStats{
1674-
ActiveConnectionIDs: activeIDs,
1675-
TotalActiveConnections: len(globalWebSocketStats.activeConnectionIDs),
1676-
TotalClosedConnections: len(globalWebSocketStats.closedConnectionIDs),
1677-
}
1678-
}
1679-
1680-
// GetWebSocketStats gets the websocket stats.
1681-
func GetWebSocketStats() WebSocketStats {
1682-
return globalWebSocketStats.GetStats()
1683-
}

subscription_metrics.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package graphql
2+
3+
import (
4+
"sync"
5+
6+
"github.com/google/uuid"
7+
)
8+
9+
// Closed connection IDs are cached to make sure the accuracy of the gauge metric when there are duplicated closed called.
10+
var maxClosedConnectionCacheSize int = 100
11+
12+
var defaultWebSocketStats = websocketStats{
13+
activeConnectionIDs: map[uuid.UUID]bool{},
14+
closedConnectionIDs: make([]uuid.UUID, 0, maxClosedConnectionCacheSize),
15+
}
16+
17+
// GetWebSocketStats gets the websocket stats.
18+
func GetWebSocketStats() WebSocketStats {
19+
return defaultWebSocketStats.GetStats()
20+
}
21+
22+
// ResetWebSocketStats reset the websocket stats.
23+
func ResetWebSocketStats() {
24+
defaultWebSocketStats.Reset()
25+
}
26+
27+
// WebSocketStats hold statistic data of WebSocket connections for subscription.
28+
type WebSocketStats struct {
29+
TotalActiveConnections uint
30+
TotalClosedConnections uint
31+
ActiveConnectionIDs []uuid.UUID
32+
}
33+
34+
type websocketStats struct {
35+
sync sync.Mutex
36+
activeConnectionIDs map[uuid.UUID]bool
37+
closedConnectionIDs []uuid.UUID
38+
totalClosedConnections uint
39+
}
40+
41+
// AddActiveConnection adds an active connection id to the list.
42+
func (ws *websocketStats) AddActiveConnection(id uuid.UUID) {
43+
ws.sync.Lock()
44+
defer ws.sync.Unlock()
45+
46+
ws.activeConnectionIDs[id] = true
47+
}
48+
49+
// AddClosedConnection adds an dead connection id to the list.
50+
func (ws *websocketStats) AddClosedConnection(id uuid.UUID) {
51+
ws.sync.Lock()
52+
defer ws.sync.Unlock()
53+
delete(ws.activeConnectionIDs, id)
54+
55+
for _, item := range ws.closedConnectionIDs {
56+
// do not increase if the connection id already exists in the queue
57+
if item == id {
58+
return
59+
}
60+
}
61+
62+
ws.totalClosedConnections++
63+
if len(ws.closedConnectionIDs) < maxClosedConnectionCacheSize {
64+
ws.closedConnectionIDs = append(ws.closedConnectionIDs, id)
65+
66+
return
67+
}
68+
69+
for i := 1; i < maxClosedConnectionCacheSize; i++ {
70+
ws.closedConnectionIDs[i-1] = ws.closedConnectionIDs[i]
71+
}
72+
73+
ws.closedConnectionIDs[maxClosedConnectionCacheSize-1] = id
74+
}
75+
76+
// Reset the websocket stats.
77+
func (ws *websocketStats) Reset() {
78+
ws.sync.Lock()
79+
defer ws.sync.Unlock()
80+
81+
ws.activeConnectionIDs = map[uuid.UUID]bool{}
82+
ws.closedConnectionIDs = make([]uuid.UUID, 0, maxClosedConnectionCacheSize)
83+
ws.totalClosedConnections = 0
84+
}
85+
86+
// GetStats gets the websocket stats.
87+
func (ws *websocketStats) GetStats() WebSocketStats {
88+
ws.sync.Lock()
89+
defer ws.sync.Unlock()
90+
91+
activeIDs := make([]uuid.UUID, 0, len(ws.activeConnectionIDs))
92+
for id := range ws.activeConnectionIDs {
93+
activeIDs = append(activeIDs, id)
94+
}
95+
96+
return WebSocketStats{
97+
ActiveConnectionIDs: activeIDs,
98+
TotalActiveConnections: uint(len(ws.activeConnectionIDs)),
99+
TotalClosedConnections: ws.totalClosedConnections,
100+
}
101+
}
102+
103+
// SetMaxClosedConnectionMetricCacheSize sets the max cache size of closed connections metrics.
104+
func SetMaxClosedConnectionMetricCacheSize(size uint) {
105+
maxClosedConnectionCacheSize = int(size)
106+
107+
if len(defaultWebSocketStats.closedConnectionIDs) <= maxClosedConnectionCacheSize {
108+
return
109+
}
110+
111+
defaultWebSocketStats.sync.Lock()
112+
defer defaultWebSocketStats.sync.Unlock()
113+
114+
newSlice := make([]uuid.UUID, maxClosedConnectionCacheSize)
115+
startIndex := len(defaultWebSocketStats.closedConnectionIDs) - maxClosedConnectionCacheSize
116+
117+
for i := 0; i < maxClosedConnectionCacheSize; i++ {
118+
newSlice[i] = defaultWebSocketStats.closedConnectionIDs[startIndex+i]
119+
}
120+
121+
defaultWebSocketStats.closedConnectionIDs = newSlice
122+
}

subscription_metrics_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package graphql
2+
3+
import (
4+
"testing"
5+
6+
"github.com/google/uuid"
7+
)
8+
9+
func TestWebSocketStats(t *testing.T) {
10+
ResetWebSocketStats()
11+
12+
for i := 0; i < 10; i++ {
13+
defaultWebSocketStats.AddActiveConnection(uuid.New())
14+
}
15+
16+
for i := 0; i < 110; i++ {
17+
defaultWebSocketStats.AddClosedConnection(uuid.New())
18+
}
19+
20+
stats := GetWebSocketStats()
21+
22+
if got, expected := stats.TotalActiveConnections, 10; got != uint(expected) {
23+
t.Errorf("total active connections, expected: %d, got: %d", expected, got)
24+
}
25+
26+
if got, expected := stats.TotalClosedConnections, 110; got != uint(expected) {
27+
t.Errorf("total closed connections, expected: %d, got: %d", expected, got)
28+
}
29+
30+
if got, expected := len(defaultWebSocketStats.closedConnectionIDs), 100; got != expected {
31+
t.Errorf("total closed connection ids, expected: %d, got: %d", expected, got)
32+
}
33+
34+
SetMaxClosedConnectionMetricCacheSize(10)
35+
36+
if got, expected := stats.TotalClosedConnections, 110; got != uint(expected) {
37+
t.Errorf("total closed connections, expected: %d, got: %d", expected, got)
38+
}
39+
40+
if got, expected := len(defaultWebSocketStats.closedConnectionIDs), 10; got != expected {
41+
t.Errorf("total closed connection ids, expected: %d, got: %d", expected, got)
42+
}
43+
44+
for i := 0; i < 10; i++ {
45+
defaultWebSocketStats.AddClosedConnection(uuid.New())
46+
}
47+
48+
stats = GetWebSocketStats()
49+
50+
if got, expected := stats.TotalClosedConnections, 120; got != uint(expected) {
51+
t.Errorf("total closed connections, expected: %d, got: %d", expected, got)
52+
}
53+
54+
if got, expected := len(defaultWebSocketStats.closedConnectionIDs), 10; got != expected {
55+
t.Errorf("total closed connection ids, expected: %d, got: %d", expected, got)
56+
}
57+
58+
ResetWebSocketStats()
59+
stats = GetWebSocketStats()
60+
61+
if got, expected := stats.TotalActiveConnections, 0; got != uint(expected) {
62+
t.Errorf("total active connections, expected: %d, got: %d", expected, got)
63+
}
64+
65+
if got, expected := stats.TotalClosedConnections, 0; got != uint(expected) {
66+
t.Errorf("total closed connections, expected: %d, got: %d", expected, got)
67+
}
68+
69+
if got, expected := len(defaultWebSocketStats.closedConnectionIDs), 0; got != expected {
70+
t.Errorf("total closed connection ids, expected: %d, got: %d", expected, got)
71+
}
72+
}

0 commit comments

Comments
 (0)