Skip to content

Commit aa44ce7

Browse files
committed
feat: Add Multi-Instance Support via Custom Channel Builders
## Overview This PR adds support for running multiple SSE server instances with shared state. It introduces a flexible, technology-agnostic approach using channel builders that allow for distributed message delivery across server instances. ## Key Features - **Custom Event Queue Builders**: Allow customizing how server-sent events are distributed across instances - **Custom Notification Channel Builders**: Support distributed notification delivery - **Redis Implementation**: Provides a ready-to-use implementation using Redis pub/sub - **No Core Dependencies**: Maintains zero external dependencies in the core library - **Comprehensive Tests**: Includes tests using a mock broker to verify functionality ## Technical Approach Rather than tightly coupling the library to a specific technology (like Redis), this PR introduces builder functions that can create custom communication channels: ```go // Create a multi-instance SSE server using Redis sseServer := server.NewSSEServer( mcpServer, server.WithBaseURL("https://api.example.com"), server.NewRedisEventQueueBuilder(redisClient), server.NewRedisNotificationChannelBuilder(redisClient), ) ``` This approach has several advantages: - Users can implement different message broker solutions (Redis, RabbitMQ, SQS, etc.) - The core library remains focused and dependency-free - Testing is simplified using mock implementations ## Implementation Notes - Channel builders are executed once per SSE session - The Redis implementation uses pub/sub for real-time message delivery - Communication channels are bidirectional for both producing and consuming messages - Tests use a mock broker to verify multi-instance behavior without external dependencies ## Example Usage ```go import ( "github.com/go-redis/redis/v8" "github.com/mark3labs/mcp-go/server" ) func createMultiInstanceServer() *server.SSEServer { // Connect to Redis redisClient := redis.NewClient(&redis.Options{ Addr: "redis:6379", }) // Create MCP server mcpServer := server.NewMCPServer( "my-app", "1.0.0", server.WithToolCapabilities(true), ) // Create SSE server with Redis-backed channels return server.NewSSEServer( mcpServer, server.WithBaseURL("https://api.example.com"), server.NewRedisEventQueueBuilder(redisClient), server.NewRedisNotificationChannelBuilder(redisClient), ) } ``` This solution enables horizontally scaling SSE servers while maintaining session consistency across instances.
1 parent 6760d87 commit aa44ce7

File tree

5 files changed

+589
-2
lines changed

5 files changed

+589
-2
lines changed

examples/redis_channels/go.mod

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
module github.com/mark3labs/mcp-go/examples/redis_channels
2+
3+
go 1.24.2
4+
5+
require (
6+
github.com/mark3labs/mcp-go v0.22.0
7+
github.com/redis/go-redis/v9 v9.7.3
8+
)
9+
10+
require (
11+
github.com/cespare/xxhash/v2 v2.2.0 // indirect
12+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
13+
github.com/google/uuid v1.6.0 // indirect
14+
github.com/spf13/cast v1.7.1 // indirect
15+
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
16+
)
17+
18+
replace github.com/mark3labs/mcp-go => ../../

examples/redis_channels/go.sum

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
2+
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
3+
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
4+
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
5+
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
6+
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
7+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
8+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
9+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
10+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
11+
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
12+
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
13+
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
14+
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
15+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
16+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
17+
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
18+
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
19+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
20+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
21+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
22+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
23+
github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM=
24+
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
25+
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
26+
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
27+
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
28+
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
29+
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
30+
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
31+
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
32+
github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4=
33+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
34+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

examples/redis_channels/main.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log"
8+
"time"
9+
10+
"github.com/mark3labs/mcp-go/mcp"
11+
"github.com/mark3labs/mcp-go/server"
12+
redis "github.com/redis/go-redis/v9"
13+
)
14+
15+
// NewRedisEventQueueBuilder creates a builder that produces Redis-backed event queues
16+
func NewRedisEventQueueBuilder(redisClient *redis.Client) server.SSEOption {
17+
return server.WithEventQueueBuilder(func(sessionID string) chan string {
18+
// Create a buffered channel for local use
19+
localChan := make(chan string, 100)
20+
21+
// Key for this session's events in Redis
22+
redisKey := fmt.Sprintf("sse:events:%s", sessionID)
23+
24+
// Start a goroutine to receive messages from Redis and forward to local channel
25+
go func() {
26+
ctx := context.Background()
27+
pubsub := redisClient.Subscribe(ctx, redisKey)
28+
defer pubsub.Close()
29+
30+
ch := pubsub.Channel()
31+
for {
32+
select {
33+
case msg := <-ch:
34+
select {
35+
case localChan <- msg.Payload:
36+
// Message forwarded successfully
37+
default:
38+
// Channel is full, might need handling
39+
}
40+
case <-time.After(30 * time.Minute):
41+
// Timeout after inactivity
42+
return
43+
}
44+
}
45+
}()
46+
47+
// Create a wrapper channel that publishes to Redis when written to
48+
wrappedChan := make(chan string, 100)
49+
go func() {
50+
ctx := context.Background()
51+
for event := range wrappedChan {
52+
// Forward to local channel for this instance
53+
select {
54+
case localChan <- event:
55+
// Also publish to Redis for other instances
56+
redisClient.Publish(ctx, redisKey, event)
57+
default:
58+
// Local channel is full, just publish to Redis
59+
redisClient.Publish(ctx, redisKey, event)
60+
}
61+
}
62+
close(localChan) // Close local channel when wrapped is closed
63+
}()
64+
65+
return wrappedChan
66+
})
67+
}
68+
69+
// NewRedisNotificationChannelBuilder creates a builder that produces Redis-backed notification channels
70+
func NewRedisNotificationChannelBuilder(redisClient *redis.Client) server.SSEOption {
71+
return server.WithNotificationChannelBuilder(func(sessionID string) chan mcp.JSONRPCNotification {
72+
// Create a buffered channel for local use
73+
localChan := make(chan mcp.JSONRPCNotification, 100)
74+
75+
// Key for this session's notifications in Redis
76+
redisKey := fmt.Sprintf("sse:notifications:%s", sessionID)
77+
78+
// Start a goroutine to receive notifications from Redis and forward to local channel
79+
go func() {
80+
ctx := context.Background()
81+
pubsub := redisClient.Subscribe(ctx, redisKey)
82+
defer pubsub.Close()
83+
84+
ch := pubsub.Channel()
85+
for {
86+
select {
87+
case msg := <-ch:
88+
var notification mcp.JSONRPCNotification
89+
if err := json.Unmarshal([]byte(msg.Payload), &notification); err == nil {
90+
select {
91+
case localChan <- notification:
92+
// Notification forwarded successfully
93+
default:
94+
// Channel is full, might need handling
95+
}
96+
}
97+
case <-time.After(30 * time.Minute):
98+
// Timeout after inactivity
99+
return
100+
}
101+
}
102+
}()
103+
104+
// Create a wrapper channel that publishes to Redis when written to
105+
wrappedChan := make(chan mcp.JSONRPCNotification, 100)
106+
go func() {
107+
ctx := context.Background()
108+
for notification := range wrappedChan {
109+
notificationData, err := json.Marshal(notification)
110+
if err != nil {
111+
continue
112+
}
113+
114+
// Forward to local channel for this instance
115+
select {
116+
case localChan <- notification:
117+
// Also publish to Redis for other instances
118+
redisClient.Publish(ctx, redisKey, string(notificationData))
119+
default:
120+
// Local channel is full, just publish to Redis
121+
redisClient.Publish(ctx, redisKey, string(notificationData))
122+
}
123+
}
124+
close(localChan) // Close local channel when wrapped is closed
125+
}()
126+
127+
return wrappedChan
128+
})
129+
}
130+
131+
func main() {
132+
// Create MCP server
133+
mcpServer := server.NewMCPServer(
134+
"example-server",
135+
"1.0.0",
136+
server.WithResourceCapabilities(true, true),
137+
server.WithToolCapabilities(true),
138+
)
139+
140+
// Configure Redis client
141+
redisOpts := &redis.Options{
142+
Addr: "localhost:6379",
143+
}
144+
redisClient := redis.NewClient(redisOpts)
145+
146+
// Create SSE server with Redis-backed channels
147+
sseServer := server.NewSSEServer(
148+
mcpServer,
149+
server.WithBaseURL("https://api.example.com"),
150+
server.WithBasePath("/api"),
151+
NewRedisEventQueueBuilder(redisClient),
152+
NewRedisNotificationChannelBuilder(redisClient),
153+
)
154+
155+
// Start the server
156+
log.Println("Starting server on :8080")
157+
if err := sseServer.Start(":8080"); err != nil {
158+
log.Fatalf("Server failed: %v", err)
159+
}
160+
}

0 commit comments

Comments
 (0)