Skip to content

Commit 9d1f447

Browse files
ezynda3andigclaude
authored
HTTP Sampling Improvements (#601)
* feat: implement sampling support for Streamable HTTP transport Implements sampling capability for HTTP transport, resolving issue #419. Enables servers to send sampling requests to HTTP clients via SSE and receive LLM-generated responses. ## Key Changes ### Core Implementation - Add `BidirectionalInterface` support to `StreamableHTTP` - Implement `SetRequestHandler` for server-to-client requests - Enhance SSE parsing to handle requests alongside responses/notifications - Add `handleIncomingRequest` and `sendResponseToServer` methods ### HTTP-Specific Features - Leverage existing MCP headers (`Mcp-Session-Id`, `Mcp-Protocol-Version`) - Bidirectional communication via HTTP POST for responses - Proper JSON-RPC request/response handling over HTTP ### Error Handling - Add specific JSON-RPC error codes for different failure scenarios: - `-32601` (Method not found) when no handler configured - `-32603` (Internal error) for sampling failures - `-32800` (Request cancelled/timeout) for context errors - Enhanced error messages with sampling-specific context ### Testing & Examples - Comprehensive test suite in `streamable_http_sampling_test.go` - Complete working example in `examples/sampling_http_client/` - Tests cover success flows, error scenarios, and interface compliance ## Technical Details The implementation maintains full backward compatibility while adding bidirectional communication support. Server requests are processed asynchronously to avoid blocking the SSE stream reader. HTTP transport now supports the complete sampling flow that was previously only available in stdio and inprocess transports. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * feat: implement server-side sampling support for HTTP transport This completes the server-side implementation of sampling support for HTTP transport, addressing the remaining requirements from issue #419. Changes: - Enhanced streamableHttpSession to implement SessionWithSampling interface - Added bidirectional SSE communication for server-to-client requests - Implemented session registry for proper response correlation - Added comprehensive error handling with JSON-RPC error codes - Created extensive test suite covering all scenarios - Added working example server with sampling tools Key Features: - Server can send sampling requests to HTTP clients via SSE - Clients respond via HTTP POST with proper session correlation - Queue overflow protection and timeout handling - Compatible with existing HTTP transport architecture 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: replace time.Sleep with synchronization primitives in tests Replace flaky time.Sleep calls with proper synchronization using channels and sync.WaitGroup to make tests deterministic and avoid race conditions. Also improves error handling robustness in test servers with proper JSON decoding error checks. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: improve request detection logic and add nil pointer checks - Make request vs response detection more robust by checking for presence of "method" field instead of relying on nil Result/Error fields - Add nil pointer check in sendResponseToServer function to prevent panics These changes improve reliability against malformed messages and edge cases. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: correct misleading comment about response delivery The comment incorrectly stated that responses are broadcast to all sessions, but the implementation actually delivers responses to the specific session identified by sessionID using the activeSessions registry. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: implement EnableSampling() to properly declare sampling capability Previously, EnableSampling() was a no-op that didn't actually enable the sampling capability in the server's declared capabilities. Changes: - Add Sampling field to mcp.ServerCapabilities struct - Add sampling field to internal serverCapabilities struct - Update EnableSampling() to set the sampling capability flag - Update handleInitialize() to include sampling in capability response - Add test to verify sampling capability is properly declared Now when EnableSampling() is called, the server will properly declare sampling capability during initialization, allowing clients to know that the server supports sending sampling requests. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: prevent panic from unsafe type assertion in example server Replace unsafe type assertion result.Content.(mcp.TextContent).Text with safe type checking to handle cases where Content might not be a TextContent struct. Now gracefully handles different content types without panicking. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: add missing EnableSampling() call in interface test The SamplingInterface test was missing the EnableSampling() call, which is necessary to activate sampling features for proper testing. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: expand error test coverage and avoid t.Fatalf - Replace single error test with comprehensive table-driven tests - Add test cases for invalid request IDs and malformed results - Replace t.Fatalf with t.Errorf to follow project conventions - Use proper session ID format for valid test scenarios 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: eliminate recursive response handling and improve routing - Remove recursive call in RequestSampling that could cause stack overflow - Remove problematic response re-queuing to global channel - Update deliverSamplingResponse to route responses directly to dedicated request channels via samplingRequests map lookup - This prevents ordering issues and ensures responses reach the correct waiting request 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: improve sampling response delivery robustness - Modified deliverSamplingResponse to return error instead of just logging - Added proper error handling for disconnected sessions - Improved error messages for debugging - Updated test expectations to match new error behavior 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: add graceful shutdown handling to sampling client - Add signal handling for SIGINT and SIGTERM - Move defer statement after error checking - Improve shutdown error handling 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: improve context handling in streamable HTTP transport - Add timeout context for SSE response processing (30s default) - Add timeout for individual connection attempts in listenForever (10s) - Use context-aware sleep in retry logic - Ensure async goroutines properly respect context cancellation 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: improve error message for notification channel queue full condition - Make error message more descriptive and actionable - Provide clearer debugging information about why the channel is blocked 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * refactor: rename struct variable for clarity in message parsing - Rename 'baseMessage' to 'jsonMessage' for more neutral naming - Improves code readability and follows consistent naming conventions 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * test: add concurrent sampling requests test with response association Add test verifying that concurrent sampling requests are handled correctly when the second request completes faster than the first. The test ensures: - Responses are correctly associated with their request IDs - Server processes requests concurrently without blocking - Completion order follows actual processing time, not submission order 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: improve context handling in async goroutine Create new context with 30-second timeout for request handling to prevent long-running handlers from blocking indefinitely. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * refactor: replace interface{} with any throughout codebase Replace all occurrences of interface{} with the modern Go any type alias for improved readability and consistency with current Go best practices. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix: improve context handling in async goroutine for StreamableHTTP Create timeout context from parent context instead of context.Background() to ensure request handlers respect parent context cancellation. Addresses review comment about context handling in async goroutine. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * refactor: remove unused samplingResponseChan field from session struct The samplingResponseChan field was declared but never used in the streamableHttpSession struct. Remove it and update tests accordingly. Addresses review comment about unused fields in session struct. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * feat: add graceful shutdown handling to sampling HTTP client example Add signal handling for SIGINT and SIGTERM to allow graceful shutdown of the sampling HTTP client example. This prevents indefinite blocking and provides better production-ready behavior. Addresses review comment about adding graceful shutdown handling. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * refactor: remove unused mu field from streamableHttpSession Removes unused sync.RWMutex field that was flagged by golangci-lint. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * Add e2e test * wip * wip * fixes * fixes * fix race condition --------- Co-authored-by: andig <[email protected]> Co-authored-by: Claude <[email protected]> Co-authored-by: andig <[email protected]>
1 parent a1dd4ef commit 9d1f447

File tree

10 files changed

+844
-35
lines changed

10 files changed

+844
-35
lines changed

client/client.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,19 @@ func (c *Client) handleSamplingRequestTransport(ctx context.Context, request tra
502502
}
503503
}
504504

505+
// Fix content parsing - HTTP transport unmarshals TextContent as map[string]any
506+
// Use the helper function to properly handle content from different transports
507+
for i := range params.Messages {
508+
if contentMap, ok := params.Messages[i].Content.(map[string]any); ok {
509+
// Parse the content map into a proper Content type
510+
content, err := mcp.ParseContent(contentMap)
511+
if err != nil {
512+
return nil, fmt.Errorf("failed to parse content for message %d: %w", i, err)
513+
}
514+
params.Messages[i].Content = content
515+
}
516+
}
517+
505518
// Create the MCP request
506519
mcpRequest := mcp.CreateMessageRequest{
507520
Request: mcp.Request{

client/transport/streamable_http.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -594,10 +594,14 @@ func (c *StreamableHTTP) IsOAuthEnabled() bool {
594594
func (c *StreamableHTTP) listenForever(ctx context.Context) {
595595
c.logger.Infof("listening to server forever")
596596
for {
597-
connectCtx, cancel := context.WithCancel(ctx)
598-
err := c.createGETConnectionToServer(connectCtx)
599-
cancel()
600-
597+
// Use the original context for continuous listening - no per-iteration timeout
598+
// The SSE connection itself will detect disconnections via the underlying HTTP transport,
599+
// and the context cancellation will propagate from the parent to stop listening gracefully.
600+
// We don't add an artificial timeout here because:
601+
// 1. Persistent SSE connections are meant to stay open indefinitely
602+
// 2. Network-level timeouts and keep-alives handle connection health
603+
// 3. Context cancellation (user-initiated or system shutdown) provides clean shutdown
604+
err := c.createGETConnectionToServer(ctx)
601605
if errors.Is(err, ErrGetMethodNotAllowed) {
602606
// server does not support listening
603607
c.logger.Errorf("server does not support listening")

0 commit comments

Comments
 (0)