Skip to content

Commit d5f6275

Browse files
authored
feat: improve subscription logging and metrics (#173)
1 parent e19460f commit d5f6275

File tree

6 files changed

+194
-35
lines changed

6 files changed

+194
-35
lines changed

example/subscription/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
)
1010

1111
require (
12-
github.com/coder/websocket v1.8.12 // indirect
12+
github.com/coder/websocket v1.8.13 // indirect
1313
github.com/google/uuid v1.6.0 // indirect
1414
github.com/gorilla/websocket v1.4.1 // indirect
1515
)

example/subscription/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo=
2-
github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
1+
github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE=
2+
github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
33
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
44
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
55
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=

example/subscription/subscription_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,15 @@ func TestTransportWS_basicTest(t *testing.T) {
125125
// wait until the subscription client connects to the server
126126
time.Sleep(2 * time.Second)
127127

128+
stats := gql.GetWebSocketStats()
129+
if stats.TotalActiveConnections != 1 {
130+
t.Fatalf("got total active websocket connections: %v, want: 1", stats.TotalActiveConnections)
131+
}
132+
133+
if stats.TotalClosedConnections != 0 {
134+
t.Fatalf("got total closed websocket connections: %v, want: 0", stats.TotalClosedConnections)
135+
}
136+
128137
// call a mutation request to send message to the subscription
129138
/*
130139
mutation ($msg: String!) {
@@ -149,6 +158,15 @@ func TestTransportWS_basicTest(t *testing.T) {
149158
}
150159

151160
<-stop
161+
162+
stats = gql.GetWebSocketStats()
163+
if stats.TotalActiveConnections != 0 {
164+
t.Fatalf("got total active websocket connections: %v, want: 0", stats.TotalActiveConnections)
165+
}
166+
167+
if stats.TotalClosedConnections != 1 {
168+
t.Fatalf("got total closed websocket connections: %v, want: 1", stats.TotalClosedConnections)
169+
}
152170
}
153171

154172
func TestTransportWS_exitWhenNoSubscription(t *testing.T) {

subscription.go

Lines changed: 125 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,10 @@ type SubscriptionProtocol interface {
139139
// SubscriptionContext represents a shared context for protocol implementations with the websocket connection inside.
140140
type SubscriptionContext struct {
141141
context.Context
142-
client *SubscriptionClient
142+
143+
client *SubscriptionClient
144+
// The unique id across the session life-cycle.
145+
sessionID uuid.UUID
143146
websocketConn WebsocketConn
144147

145148
connectionInitAt time.Time
@@ -154,10 +157,15 @@ type SubscriptionContext struct {
154157
// Log prints condition logging with message type filters.
155158
func (sc *SubscriptionContext) Log(
156159
message interface{},
157-
source string,
160+
metadata map[string]any,
158161
opType OperationMessageType,
159162
) {
160-
sc.client.printLog(message, source, opType)
163+
metadata["session_id"] = sc.sessionID
164+
if conn, ok := sc.websocketConn.(*WebsocketHandler); ok {
165+
metadata["connection_id"] = conn.GetID()
166+
}
167+
168+
sc.client.printLog(message, metadata, opType)
161169
}
162170

163171
// OnConnectionAlive executes the OnConnectionAlive callback if exists.
@@ -348,8 +356,12 @@ func (sc *SubscriptionContext) Close() error {
348356

349357
var err error
350358
sc.SetClosed(true)
359+
conn := sc.GetWebsocketConn()
360+
sc.Log("closing the current session", map[string]any{
361+
"connection_active": conn != nil,
362+
}, GQLInternal)
351363

352-
if conn := sc.GetWebsocketConn(); conn != nil {
364+
if conn != nil {
353365
if sc.client.onDisconnected != nil {
354366
sc.client.onDisconnected()
355367
}
@@ -369,7 +381,9 @@ func (sc *SubscriptionContext) Close() error {
369381
// Send emits a message to the graphql server.
370382
func (sc *SubscriptionContext) Send(message interface{}, opType OperationMessageType) error {
371383
if conn := sc.GetWebsocketConn(); conn != nil {
372-
sc.Log(message, "client", opType)
384+
sc.Log(message, map[string]any{
385+
"source": "client",
386+
}, opType)
373387

374388
return conn.WriteJSON(message)
375389
}
@@ -379,6 +393,7 @@ func (sc *SubscriptionContext) Send(message interface{}, opType OperationMessage
379393

380394
// initializes the websocket connection.
381395
func (sc *SubscriptionContext) init(parentContext context.Context) error {
396+
sc.Log("initialize new session", map[string]any{}, GQLInternal)
382397
now := time.Now()
383398

384399
for {
@@ -423,7 +438,9 @@ func (sc *SubscriptionContext) init(parentContext context.Context) error {
423438

424439
sc.Log(
425440
fmt.Sprintf("%s. retry in %d second...", err.Error(), sc.client.retryDelay/time.Second),
426-
"client",
441+
map[string]any{
442+
"source": "client",
443+
},
427444
GQLInternal,
428445
)
429446
time.Sleep(sc.client.retryDelay)
@@ -448,7 +465,9 @@ func (sc *SubscriptionContext) run() {
448465
if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "EOF") ||
449466
errors.Is(err, net.ErrClosed) ||
450467
strings.Contains(err.Error(), "connection reset by peer") {
451-
sc.Log(err.Error(), "client", GQLConnectionError)
468+
sc.Log(err.Error(), map[string]any{
469+
"source": "client",
470+
}, GQLConnectionError)
452471
sc.client.errorChan <- errRestartSubscriptionClient
453472

454473
return
@@ -470,7 +489,9 @@ func (sc *SubscriptionContext) run() {
470489
}
471490

472491
if closeStatus < 0 {
473-
sc.Log(err, "server", GQL_CONNECTION_ERROR)
492+
sc.Log(err, map[string]any{
493+
"source": "client",
494+
}, GQL_CONNECTION_ERROR)
474495

475496
continue
476497
}
@@ -482,14 +503,18 @@ func (sc *SubscriptionContext) run() {
482503
websocket.StatusTryAgainLater,
483504
websocket.StatusMessageTooBig,
484505
websocket.StatusInvalidFramePayloadData:
485-
sc.Log(err, "server", GQL_CONNECTION_ERROR)
506+
sc.Log(err, map[string]any{
507+
"source": "server",
508+
}, GQL_CONNECTION_ERROR)
486509
sc.client.errorChan <- errRestartSubscriptionClient
487510
case websocket.StatusNormalClosure, websocket.StatusAbnormalClosure:
488511
// close event from websocket client, exiting...
489512
_ = sc.client.close(sc)
490513
default:
491514
// let the user to handle unknown errors manually.
492-
sc.Log(err, "server", GQL_CONNECTION_ERROR)
515+
sc.Log(err, map[string]any{
516+
"source": "server",
517+
}, GQL_CONNECTION_ERROR)
493518
sc.client.errorChan <- err
494519
}
495520

@@ -531,7 +556,9 @@ func (sc *SubscriptionContext) startWebsocketKeepAlive(c WebsocketConn, interval
531556
// Ping the websocket. You might want to handle any potential errors.
532557
err := c.Ping()
533558
if err != nil {
534-
sc.Log("Failed to ping server", "client", GQLInternal)
559+
sc.Log("Failed to ping server", map[string]any{
560+
"source": "client",
561+
}, GQLInternal)
535562
sc.client.errorChan <- errRestartSubscriptionClient
536563

537564
return
@@ -1137,7 +1164,9 @@ func (sc *SubscriptionClient) RunWithContext(ctx context.Context) error {
11371164
time.Since(
11381165
session.getConnectionInitAt(),
11391166
) > sc.connectionInitialisationTimeout {
1140-
sc.printLog("Connection initialisation timeout", "client", GQLInternal)
1167+
sc.printLog("Connection initialisation timeout", map[string]any{
1168+
"source": "client",
1169+
}, GQLInternal)
11411170
sc.errorChan <- &websocket.CloseError{
11421171
Code: StatusConnectionInitialisationTimeout,
11431172
Reason: "Connection initialisation timeout",
@@ -1152,7 +1181,9 @@ func (sc *SubscriptionClient) RunWithContext(ctx context.Context) error {
11521181
) > sc.websocketConnectionIdleTimeout {
11531182
sc.printLog(
11541183
ErrWebsocketConnectionIdleTimeout.Error(),
1155-
"client",
1184+
map[string]any{
1185+
"source": "client",
1186+
},
11561187
GQLInternal,
11571188
)
11581189
sc.errorChan <- ErrWebsocketConnectionIdleTimeout
@@ -1269,6 +1300,8 @@ func (sc *SubscriptionClient) close(session *SubscriptionContext) error {
12691300
return nil
12701301
}
12711302

1303+
sc.printLog("close the subscription client", map[string]any{}, GQLInternal)
1304+
12721305
sc.setClientStatus(scStatusClosing)
12731306
if sc.cancel != nil {
12741307
sc.cancel()
@@ -1323,6 +1356,7 @@ func (sc *SubscriptionClient) initNewSession(ctx context.Context) (*Subscription
13231356

13241357
subContext := &SubscriptionContext{
13251358
client: sc,
1359+
sessionID: uuid.New(),
13261360
subscriptions: make(map[string]Subscription),
13271361
}
13281362

@@ -1380,15 +1414,17 @@ func (sc *SubscriptionClient) checkSubscriptionStatuses(session *SubscriptionCon
13801414
SubscriptionRunning,
13811415
SubscriptionWaiting,
13821416
}) == 0 {
1383-
session.Log("no running subscription. exiting...", "client", GQLInternal)
1417+
session.Log("no running subscription. exiting...", map[string]any{
1418+
"source": "client",
1419+
}, GQLInternal)
13841420
_ = sc.close(session)
13851421
}
13861422
}
13871423

13881424
// prints condition logging with message type filters.
13891425
func (sc *SubscriptionClient) printLog(
13901426
message interface{},
1391-
source string,
1427+
metadata map[string]any,
13921428
opType OperationMessageType,
13931429
) {
13941430
if sc.log == nil {
@@ -1401,7 +1437,8 @@ func (sc *SubscriptionClient) printLog(
14011437
}
14021438
}
14031439

1404-
sc.log(message, source)
1440+
metadata["type"] = opType
1441+
sc.log(message, metadata)
14051442
}
14061443

14071444
// The payload format of both subscriptions-transport-ws and graphql-ws are the same.
@@ -1453,11 +1490,20 @@ func parseInt32Ranges(codes []string) ([][]int32, error) {
14531490
type WebsocketHandler struct {
14541491
*websocket.Conn
14551492

1493+
// The unique id of the current websocket connections.
1494+
// The ID will be generated whenever initializing a new Websocket connection.
1495+
id uuid.UUID
1496+
14561497
ctx context.Context
14571498
readTimeout time.Duration
14581499
writeTimeout time.Duration
14591500
}
14601501

1502+
// GetID gets the identity of the Websocket connection.
1503+
func (wh WebsocketHandler) GetID() uuid.UUID {
1504+
return wh.id
1505+
}
1506+
14611507
// WriteJSON implements the function to encode and send message in json format to the server.
14621508
func (wh *WebsocketHandler) WriteJSON(v interface{}) error {
14631509
ctx, cancel := context.WithTimeout(wh.ctx, wh.writeTimeout)
@@ -1484,6 +1530,8 @@ func (wh *WebsocketHandler) Ping() error {
14841530

14851531
// Close implements the function to close the websocket connection.
14861532
func (wh *WebsocketHandler) Close() error {
1533+
globalWebSocketStats.AddDeadConnection(wh.id)
1534+
14871535
return wh.Conn.Close(websocket.StatusNormalClosure, "close websocket")
14881536
}
14891537

@@ -1528,8 +1576,12 @@ func newWebsocketConn(
15281576
return nil, err
15291577
}
15301578

1579+
id := uuid.New()
1580+
globalWebSocketStats.AddActiveConnection(id)
1581+
15311582
return &WebsocketHandler{
15321583
Conn: c,
1584+
id: id,
15331585
ctx: ctx,
15341586
readTimeout: options.ReadTimeout,
15351587
writeTimeout: options.WriteTimeout,
@@ -1572,3 +1624,60 @@ type WebsocketOptions struct {
15721624
// The graphql server depends on the Sec-WebSocket-Protocol header to return the correct message specification
15731625
Subprotocols []string
15741626
}
1627+
1628+
// WebSocketStats hold statistic data of WebSocket connections for subscription.
1629+
type WebSocketStats struct {
1630+
TotalActiveConnections int
1631+
TotalClosedConnections int
1632+
ActiveConnectionIDs []uuid.UUID
1633+
}
1634+
1635+
type websocketStats struct {
1636+
sync sync.Mutex
1637+
activeConnectionIDs map[uuid.UUID]bool
1638+
closedConnectionIDs map[uuid.UUID]bool
1639+
}
1640+
1641+
var globalWebSocketStats = websocketStats{
1642+
activeConnectionIDs: map[uuid.UUID]bool{},
1643+
closedConnectionIDs: map[uuid.UUID]bool{},
1644+
}
1645+
1646+
// AddActiveConnection adds an active connection id to the list.
1647+
func (ws *websocketStats) AddActiveConnection(id uuid.UUID) {
1648+
ws.sync.Lock()
1649+
defer ws.sync.Unlock()
1650+
1651+
ws.activeConnectionIDs[id] = true
1652+
}
1653+
1654+
// AddDeadConnection adds an dead connection id to the list.
1655+
func (ws *websocketStats) AddDeadConnection(id uuid.UUID) {
1656+
ws.sync.Lock()
1657+
defer ws.sync.Unlock()
1658+
delete(ws.activeConnectionIDs, id)
1659+
1660+
ws.closedConnectionIDs[id] = true
1661+
}
1662+
1663+
// GetStats gets the websocket stats.
1664+
func (ws *websocketStats) GetStats() WebSocketStats {
1665+
ws.sync.Lock()
1666+
defer ws.sync.Unlock()
1667+
1668+
activeIDs := make([]uuid.UUID, 0, len(ws.activeConnectionIDs))
1669+
for id := range ws.activeConnectionIDs {
1670+
activeIDs = append(activeIDs, id)
1671+
}
1672+
1673+
return WebSocketStats{
1674+
ActiveConnectionIDs: activeIDs,
1675+
TotalActiveConnections: len(globalWebSocketStats.activeConnectionIDs),
1676+
TotalClosedConnections: len(globalWebSocketStats.closedConnectionIDs),
1677+
}
1678+
}
1679+
1680+
// GetWebSocketStats gets the websocket stats.
1681+
func GetWebSocketStats() WebSocketStats {
1682+
return globalWebSocketStats.GetStats()
1683+
}

0 commit comments

Comments
 (0)