diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java index 025cfeac..12827f46 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java @@ -194,6 +194,18 @@ void testNotifyResourcesListChanged() { assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException(); } + @Test + void testNotifyResourcesUpdated() { + var mcpAsyncServer = McpServer.async(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build(); + + StepVerifier + .create(mcpAsyncServer + .notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI))) + .verifyComplete(); + + assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException(); + } + @Test void testAddResource() { var mcpAsyncServer = McpServer.async(createMcpTransportProvider()) diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java index e313454b..eefcdf9a 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java @@ -191,6 +191,17 @@ void testNotifyResourcesListChanged() { assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException(); } + @Test + void testNotifyResourcesUpdated() { + var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build(); + + assertThatCode(() -> mcpSyncServer + .notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI))) + .doesNotThrowAnyException(); + + assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException(); + } + @Test void testAddResource() { var mcpSyncServer = McpServer.sync(createMcpTransportProvider()) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index e3a997ba..bd4d240b 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -215,6 +215,18 @@ public class McpAsyncClient { notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED, asyncResourcesChangeNotificationHandler(resourcesChangeConsumersFinal)); + // Resources Update Notification + List, Mono>> resourcesUpdateConsumersFinal = new ArrayList<>(); + resourcesUpdateConsumersFinal + .add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources updated: {}", notification))); + + if (!Utils.isEmpty(features.resourcesUpdateConsumers())) { + resourcesUpdateConsumersFinal.addAll(features.resourcesUpdateConsumers()); + } + + notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, + asyncResourcesUpdatedNotificationHandler(resourcesUpdateConsumersFinal)); + // Prompts Change Notification List, Mono>> promptsChangeConsumersFinal = new ArrayList<>(); promptsChangeConsumersFinal @@ -708,6 +720,24 @@ private NotificationHandler asyncResourcesChangeNotificationHandler( .then()); } + private NotificationHandler asyncResourcesUpdatedNotificationHandler( + List, Mono>> resourcesUpdateConsumers) { + return params -> { + McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification = transport.unmarshalFrom(params, + new TypeReference<>() { + }); + + return readResource(new McpSchema.ReadResourceRequest(resourcesUpdatedNotification.uri())) + .flatMap(readResourceResult -> Flux.fromIterable(resourcesUpdateConsumers) + .flatMap(consumer -> consumer.apply(readResourceResult.contents())) + .onErrorResume(error -> { + logger.error("Error handling resource update notification", error); + return Mono.empty(); + }) + .then()); + }; + } + // -------------------------- // Prompts // -------------------------- diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java index a1dc1168..3c5b9f8a 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java @@ -169,6 +169,8 @@ class SyncSpec { private final List>> resourcesChangeConsumers = new ArrayList<>(); + private final List>> resourcesUpdateConsumers = new ArrayList<>(); + private final List>> promptsChangeConsumers = new ArrayList<>(); private final List> loggingConsumers = new ArrayList<>(); @@ -363,8 +365,8 @@ public SyncSpec loggingConsumers(List, Mono>> resourcesChangeConsumers = new ArrayList<>(); + private final List, Mono>> resourcesUpdateConsumers = new ArrayList<>(); + private final List, Mono>> promptsChangeConsumers = new ArrayList<>(); private final List>> loggingConsumers = new ArrayList<>(); @@ -553,6 +557,23 @@ public AsyncSpec resourcesChangeConsumer( return this; } + /** + * Adds a consumer to be notified when a specific resource is updated. This allows + * the client to react to changes in individual resources, such as updates to + * their content or metadata. + * @param resourcesUpdateConsumer A consumer function that processes the updated + * resource and returns a Mono indicating the completion of the processing. Must + * not be null. + * @return This builder instance for method chaining. + * @throws IllegalArgumentException If the resourcesUpdateConsumer is null. + */ + public AsyncSpec resourcesUpdateConsumer( + Function, Mono> resourcesUpdateConsumer) { + Assert.notNull(resourcesUpdateConsumer, "Resources update consumer must not be null"); + this.resourcesUpdateConsumers.add(resourcesUpdateConsumer); + return this; + } + /** * Adds a consumer to be notified when the available prompts change. This allows * the client to react to changes in the server's prompt templates, such as new @@ -605,8 +626,8 @@ public AsyncSpec loggingConsumers( public McpAsyncClient build() { return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout, new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots, - this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers, - this.loggingConsumers, this.samplingHandler)); + this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers, + this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler)); } } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java index 284b93f8..170915a9 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java @@ -64,6 +64,7 @@ class McpClientFeatures { record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, Map roots, List, Mono>> toolsChangeConsumers, List, Mono>> resourcesChangeConsumers, + List, Mono>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, Function> samplingHandler) { @@ -82,6 +83,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c Map roots, List, Mono>> toolsChangeConsumers, List, Mono>> resourcesChangeConsumers, + List, Mono>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, Function> samplingHandler) { @@ -96,6 +98,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of(); this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of(); + this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); this.samplingHandler = samplingHandler; @@ -122,8 +125,13 @@ public static Async fromSync(Sync syncSpec) { .subscribeOn(Schedulers.boundedElastic())); } - List, Mono>> promptsChangeConsumers = new ArrayList<>(); + List, Mono>> resourcesUpdateConsumers = new ArrayList<>(); + for (Consumer> consumer : syncSpec.resourcesUpdateConsumers()) { + resourcesUpdateConsumers.add(r -> Mono.fromRunnable(() -> consumer.accept(r)) + .subscribeOn(Schedulers.boundedElastic())); + } + List, Mono>> promptsChangeConsumers = new ArrayList<>(); for (Consumer> consumer : syncSpec.promptsChangeConsumers()) { promptsChangeConsumers.add(p -> Mono.fromRunnable(() -> consumer.accept(p)) .subscribeOn(Schedulers.boundedElastic())); @@ -139,8 +147,8 @@ public static Async fromSync(Sync syncSpec) { .fromCallable(() -> syncSpec.samplingHandler().apply(r)) .subscribeOn(Schedulers.boundedElastic()); return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(), - toolsChangeConsumers, resourcesChangeConsumers, promptsChangeConsumers, loggingConsumers, - samplingHandler); + toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers, + loggingConsumers, samplingHandler); } } @@ -160,6 +168,7 @@ public static Async fromSync(Sync syncSpec) { public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, Map roots, List>> toolsChangeConsumers, List>> resourcesChangeConsumers, + List>> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, Function samplingHandler) { @@ -171,6 +180,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili * @param roots the roots. * @param toolsChangeConsumers the tools change consumers. * @param resourcesChangeConsumers the resources change consumers. + * @param resourcesUpdateConsumers the resource update consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. * @param samplingHandler the sampling handler. @@ -178,6 +188,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, Map roots, List>> toolsChangeConsumers, List>> resourcesChangeConsumers, + List>> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, Function samplingHandler) { @@ -192,6 +203,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of(); this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of(); + this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); this.samplingHandler = samplingHandler; diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java index 1efa13de..02ad955b 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java @@ -430,6 +430,15 @@ public Mono notifyResourcesListChanged() { return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED, null); } + /** + * Notifies clients that the resources have updated. + * @return A Mono that completes when all clients have been notified + */ + public Mono notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification) { + return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, + resourcesUpdatedNotification); + } + private McpServerSession.RequestHandler resourcesListRequestHandler() { return (exchange, params) -> { var resourceList = this.resources.values() diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java index bf310450..91f8d9e4 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java @@ -141,6 +141,13 @@ public void notifyResourcesListChanged() { this.asyncServer.notifyResourcesListChanged().block(); } + /** + * Notify clients that the resources have updated. + */ + public void notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification) { + this.asyncServer.notifyResourcesUpdated(resourcesUpdatedNotification).block(); + } + /** * Notify clients that the list of available prompts has changed. */ diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java index 8df8a158..ba9cc0f2 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java @@ -66,6 +66,8 @@ private McpSchema() { public static final String METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED = "notifications/resources/list_changed"; + public static final String METHOD_NOTIFICATION_RESOURCES_UPDATED = "notifications/resources/updated"; + public static final String METHOD_RESOURCES_TEMPLATES_LIST = "resources/templates/list"; public static final String METHOD_RESOURCES_SUBSCRIBE = "resources/subscribe"; @@ -1111,6 +1113,17 @@ public record ProgressNotification(// @formatter:off @JsonProperty("total") Double total) { }// @formatter:on + /** + * The Model Context Protocol (MCP) provides a standardized way for servers to send + * resources update message to clients. + * + * @param uri The updated resource uri. + */ + @JsonIgnoreProperties(ignoreUnknown = true) + public record ResourcesUpdatedNotification(// @formatter:off + @JsonProperty("uri") String uri) { + }// @formatter:on + /** * The Model Context Protocol (MCP) provides a standardized way for servers to send * structured log messages to clients. Clients can control logging verbosity by diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java index df0b0c72..dd9f6589 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java @@ -193,6 +193,18 @@ void testNotifyResourcesListChanged() { assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException(); } + @Test + void testNotifyResourcesUpdated() { + var mcpAsyncServer = McpServer.async(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build(); + + StepVerifier + .create(mcpAsyncServer + .notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI))) + .verifyComplete(); + + assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException(); + } + @Test void testAddResource() { var mcpAsyncServer = McpServer.async(createMcpTransportProvider()) diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java index 0b38da85..6cbb8632 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java @@ -190,6 +190,17 @@ void testNotifyResourcesListChanged() { assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException(); } + @Test + void testNotifyResourcesUpdated() { + var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build(); + + assertThatCode(() -> mcpSyncServer + .notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI))) + .doesNotThrowAnyException(); + + assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException(); + } + @Test void testAddResource() { var mcpSyncServer = McpServer.sync(createMcpTransportProvider())