From f222c446dc5fb4cfaa1e715636da0cc93ba65b0e Mon Sep 17 00:00:00 2001 From: gongwn1 Date: Mon, 26 May 2025 12:08:37 +0800 Subject: [PATCH 1/5] feat: client resource update handler --- .../client/McpAsyncClient.java | 23 +++++++++++++++++++ .../client/McpClient.java | 12 ++++++---- .../client/McpClientFeatures.java | 20 ++++++++++++---- .../modelcontextprotocol/spec/McpSchema.java | 2 ++ 4 files changed, 49 insertions(+), 8 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index e3a997ba..d6c8b34a 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -215,6 +215,18 @@ public class McpAsyncClient { notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED, asyncResourcesChangeNotificationHandler(resourcesChangeConsumersFinal)); + // Resource Update Notification + List>> resourcesUpdateConsumersFinal = new ArrayList<>(); + resourcesUpdateConsumersFinal + .add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources updated: {}", notification))); + + if (!Utils.isEmpty(features.resourcesUpdateConsumers())) { + resourcesUpdateConsumersFinal.addAll(features.resourcesUpdateConsumers()); + } + + notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, + asyncResourcesUpdatedNotificationHandler(resourcesChangeConsumersFinal)); + // Prompts Change Notification List, Mono>> promptsChangeConsumersFinal = new ArrayList<>(); promptsChangeConsumersFinal @@ -708,6 +720,17 @@ private NotificationHandler asyncResourcesChangeNotificationHandler( .then()); } + private NotificationHandler asyncResourcesUpdatedNotificationHandler( + List, Mono>> resourcesUpdateConsumers) { + return params -> listResources().flatMap(listResourcesResult -> Flux.fromIterable(resourcesUpdateConsumers) + .flatMap(consumer -> consumer.apply(listResourcesResult.resources())) + .onErrorResume(error -> { + logger.error("Error handling resources updated notification", error); + return Mono.empty(); + }) + .then()); + } + // -------------------------- // Prompts // -------------------------- diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java index a1dc1168..e297ff3a 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java @@ -169,6 +169,8 @@ class SyncSpec { private final List>> resourcesChangeConsumers = new ArrayList<>(); + private final List> resourceUpdateConsumers = new ArrayList<>(); + private final List>> promptsChangeConsumers = new ArrayList<>(); private final List> loggingConsumers = new ArrayList<>(); @@ -363,8 +365,8 @@ public SyncSpec loggingConsumers(List, Mono>> resourcesChangeConsumers = new ArrayList<>(); + private final List>> resourcesUpdateConsumers = new ArrayList<>(); + private final List, Mono>> promptsChangeConsumers = new ArrayList<>(); private final List>> loggingConsumers = new ArrayList<>(); @@ -605,8 +609,8 @@ public AsyncSpec loggingConsumers( public McpAsyncClient build() { return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout, new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots, - this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers, - this.loggingConsumers, this.samplingHandler)); + this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers, + this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler)); } } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java index 284b93f8..4cffd5f9 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java @@ -64,6 +64,7 @@ class McpClientFeatures { record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, Map roots, List, Mono>> toolsChangeConsumers, List, Mono>> resourcesChangeConsumers, + List>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, Function> samplingHandler) { @@ -82,6 +83,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c Map roots, List, Mono>> toolsChangeConsumers, List, Mono>> resourcesChangeConsumers, + List>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, Function> samplingHandler) { @@ -96,6 +98,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of(); this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of(); + this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); this.samplingHandler = samplingHandler; @@ -122,8 +125,13 @@ public static Async fromSync(Sync syncSpec) { .subscribeOn(Schedulers.boundedElastic())); } - List, Mono>> promptsChangeConsumers = new ArrayList<>(); + List>> resourceUpdateConsumers = new ArrayList<>(); + for (Consumer consumer : syncSpec.resourceUpdateConsumers()) { + resourceUpdateConsumers.add(r -> Mono.fromRunnable(() -> consumer.accept(r)) + .subscribeOn(Schedulers.boundedElastic())); + } + List, Mono>> promptsChangeConsumers = new ArrayList<>(); for (Consumer> consumer : syncSpec.promptsChangeConsumers()) { promptsChangeConsumers.add(p -> Mono.fromRunnable(() -> consumer.accept(p)) .subscribeOn(Schedulers.boundedElastic())); @@ -139,8 +147,8 @@ public static Async fromSync(Sync syncSpec) { .fromCallable(() -> syncSpec.samplingHandler().apply(r)) .subscribeOn(Schedulers.boundedElastic()); return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(), - toolsChangeConsumers, resourcesChangeConsumers, promptsChangeConsumers, loggingConsumers, - samplingHandler); + toolsChangeConsumers, resourcesChangeConsumers, resourceUpdateConsumers, promptsChangeConsumers, + loggingConsumers, samplingHandler); } } @@ -160,6 +168,7 @@ public static Async fromSync(Sync syncSpec) { public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, Map roots, List>> toolsChangeConsumers, List>> resourcesChangeConsumers, + List> resourceUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, Function samplingHandler) { @@ -170,7 +179,8 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili * @param clientCapabilities the client capabilities. * @param roots the roots. * @param toolsChangeConsumers the tools change consumers. - * @param resourcesChangeConsumers the resources change consumers. + * @param resourcesChangeConsumers the resources change consumers. * @param + * resourceUpdateConsumers the resource update consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. * @param samplingHandler the sampling handler. @@ -178,6 +188,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, Map roots, List>> toolsChangeConsumers, List>> resourcesChangeConsumers, + List> resourceUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, Function samplingHandler) { @@ -192,6 +203,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of(); this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of(); + this.resourceUpdateConsumers = resourceUpdateConsumers != null ? resourceUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); this.samplingHandler = samplingHandler; diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java index 8df8a158..79187f42 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java @@ -66,6 +66,8 @@ private McpSchema() { public static final String METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED = "notifications/resources/list_changed"; + public static final String METHOD_NOTIFICATION_RESOURCES_UPDATED = "notifications/resources/updated"; + public static final String METHOD_RESOURCES_TEMPLATES_LIST = "resources/templates/list"; public static final String METHOD_RESOURCES_SUBSCRIBE = "resources/subscribe"; From 5e9b38f652d27a8acc7f81339df8e2b421b5d497 Mon Sep 17 00:00:00 2001 From: gongwn1 Date: Mon, 26 May 2025 12:44:22 +0800 Subject: [PATCH 2/5] add resourcesUpdateConsumer --- .../modelcontextprotocol/client/McpClient.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java index e297ff3a..a8f14746 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java @@ -557,6 +557,22 @@ public AsyncSpec resourcesChangeConsumer( return this; } + /** + * Adds a consumer to be notified when a specific resource is updated. This allows + * the client to react to changes in individual resources, such as updates to + * their content or metadata. + * @param resourcesUpdateConsumer A consumer function that processes the updated + * resource and returns a Mono indicating the completion of the processing. Must + * not be null. + * @return This builder instance for method chaining. + * @throws IllegalArgumentException If the resourcesUpdateConsumer is null. + */ + public AsyncSpec resourcesUpdateConsumer(Function> resourcesUpdateConsumer) { + Assert.notNull(resourcesUpdateConsumer, "Resources update consumer must not be null"); + this.resourcesUpdateConsumers.add(resourcesUpdateConsumer); + return this; + } + /** * Adds a consumer to be notified when the available prompts change. This allows * the client to react to changes in the server's prompt templates, such as new From bda41e1e0b6623015005b349b1ee0210f253e163 Mon Sep 17 00:00:00 2001 From: gongwn1 Date: Mon, 26 May 2025 13:16:51 +0800 Subject: [PATCH 3/5] fix: resource update handler --- .../client/McpAsyncClient.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index d6c8b34a..3cdbffcc 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -225,7 +225,7 @@ public class McpAsyncClient { } notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, - asyncResourcesUpdatedNotificationHandler(resourcesChangeConsumersFinal)); + asyncResourcesUpdatedNotificationHandler(resourcesUpdateConsumersFinal)); // Prompts Change Notification List, Mono>> promptsChangeConsumersFinal = new ArrayList<>(); @@ -721,14 +721,20 @@ private NotificationHandler asyncResourcesChangeNotificationHandler( } private NotificationHandler asyncResourcesUpdatedNotificationHandler( - List, Mono>> resourcesUpdateConsumers) { - return params -> listResources().flatMap(listResourcesResult -> Flux.fromIterable(resourcesUpdateConsumers) - .flatMap(consumer -> consumer.apply(listResourcesResult.resources())) - .onErrorResume(error -> { - logger.error("Error handling resources updated notification", error); - return Mono.empty(); - }) - .then()); + List>> resourcesUpdateConsumers) { + return params -> { + McpSchema.Resource updatedResource = transport.unmarshalFrom(params, + new TypeReference() { + }); + + return Flux.fromIterable(resourcesUpdateConsumers) + .flatMap(consumer -> consumer.apply(updatedResource)) + .onErrorResume(error -> { + logger.error("Error handling resource update notification", error); + return Mono.empty(); + }) + .then(); + }; } // -------------------------- From 9c66abc015bc7eed178d692016f6b68dc29c5e50 Mon Sep 17 00:00:00 2001 From: gongwn1 Date: Mon, 26 May 2025 13:54:33 +0800 Subject: [PATCH 4/5] fix typo --- .../client/McpAsyncClient.java | 7 +++---- .../modelcontextprotocol/client/McpClient.java | 4 ++-- .../client/McpClientFeatures.java | 18 +++++++++--------- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index 3cdbffcc..0559cec6 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -215,7 +215,7 @@ public class McpAsyncClient { notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED, asyncResourcesChangeNotificationHandler(resourcesChangeConsumersFinal)); - // Resource Update Notification + // Resources Update Notification List>> resourcesUpdateConsumersFinal = new ArrayList<>(); resourcesUpdateConsumersFinal .add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources updated: {}", notification))); @@ -723,9 +723,8 @@ private NotificationHandler asyncResourcesChangeNotificationHandler( private NotificationHandler asyncResourcesUpdatedNotificationHandler( List>> resourcesUpdateConsumers) { return params -> { - McpSchema.Resource updatedResource = transport.unmarshalFrom(params, - new TypeReference() { - }); + McpSchema.Resource updatedResource = transport.unmarshalFrom(params, new TypeReference<>() { + }); return Flux.fromIterable(resourcesUpdateConsumers) .flatMap(consumer -> consumer.apply(updatedResource)) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java index a8f14746..a945e6a9 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java @@ -169,7 +169,7 @@ class SyncSpec { private final List>> resourcesChangeConsumers = new ArrayList<>(); - private final List> resourceUpdateConsumers = new ArrayList<>(); + private final List> resourcesUpdateConsumers = new ArrayList<>(); private final List>> promptsChangeConsumers = new ArrayList<>(); @@ -365,7 +365,7 @@ public SyncSpec loggingConsumers(List>> resourceUpdateConsumers = new ArrayList<>(); - for (Consumer consumer : syncSpec.resourceUpdateConsumers()) { - resourceUpdateConsumers.add(r -> Mono.fromRunnable(() -> consumer.accept(r)) + List>> resourcesUpdateConsumers = new ArrayList<>(); + for (Consumer consumer : syncSpec.resourcesUpdateConsumers()) { + resourcesUpdateConsumers.add(r -> Mono.fromRunnable(() -> consumer.accept(r)) .subscribeOn(Schedulers.boundedElastic())); } @@ -147,7 +147,7 @@ public static Async fromSync(Sync syncSpec) { .fromCallable(() -> syncSpec.samplingHandler().apply(r)) .subscribeOn(Schedulers.boundedElastic()); return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(), - toolsChangeConsumers, resourcesChangeConsumers, resourceUpdateConsumers, promptsChangeConsumers, + toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers, loggingConsumers, samplingHandler); } } @@ -168,7 +168,7 @@ public static Async fromSync(Sync syncSpec) { public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, Map roots, List>> toolsChangeConsumers, List>> resourcesChangeConsumers, - List> resourceUpdateConsumers, + List> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, Function samplingHandler) { @@ -179,8 +179,8 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili * @param clientCapabilities the client capabilities. * @param roots the roots. * @param toolsChangeConsumers the tools change consumers. - * @param resourcesChangeConsumers the resources change consumers. * @param - * resourceUpdateConsumers the resource update consumers. + * @param resourcesChangeConsumers the resources change consumers. + * @param resourcesUpdateConsumers the resource update consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. * @param samplingHandler the sampling handler. @@ -188,7 +188,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, Map roots, List>> toolsChangeConsumers, List>> resourcesChangeConsumers, - List> resourceUpdateConsumers, + List> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, Function samplingHandler) { @@ -203,7 +203,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of(); this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of(); - this.resourceUpdateConsumers = resourceUpdateConsumers != null ? resourceUpdateConsumers : List.of(); + this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); this.samplingHandler = samplingHandler; From 58a9541ca3c281729c3b571494ca26f0feda163b Mon Sep 17 00:00:00 2001 From: gongwn1 Date: Wed, 28 May 2025 15:29:14 +0800 Subject: [PATCH 5/5] feat: resource update server notify --- .../server/AbstractMcpAsyncServerTests.java | 12 ++++++++++ .../server/AbstractMcpSyncServerTests.java | 11 +++++++++ .../client/McpAsyncClient.java | 24 ++++++++++--------- .../client/McpClient.java | 7 +++--- .../client/McpClientFeatures.java | 12 +++++----- .../server/McpAsyncServer.java | 9 +++++++ .../server/McpSyncServer.java | 7 ++++++ .../modelcontextprotocol/spec/McpSchema.java | 11 +++++++++ .../server/AbstractMcpAsyncServerTests.java | 12 ++++++++++ .../server/AbstractMcpSyncServerTests.java | 11 +++++++++ 10 files changed, 96 insertions(+), 20 deletions(-) diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java index 025cfeac..12827f46 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java @@ -194,6 +194,18 @@ void testNotifyResourcesListChanged() { assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException(); } + @Test + void testNotifyResourcesUpdated() { + var mcpAsyncServer = McpServer.async(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build(); + + StepVerifier + .create(mcpAsyncServer + .notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI))) + .verifyComplete(); + + assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException(); + } + @Test void testAddResource() { var mcpAsyncServer = McpServer.async(createMcpTransportProvider()) diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java index e313454b..eefcdf9a 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java @@ -191,6 +191,17 @@ void testNotifyResourcesListChanged() { assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException(); } + @Test + void testNotifyResourcesUpdated() { + var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build(); + + assertThatCode(() -> mcpSyncServer + .notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI))) + .doesNotThrowAnyException(); + + assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException(); + } + @Test void testAddResource() { var mcpSyncServer = McpServer.sync(createMcpTransportProvider()) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index 0559cec6..bd4d240b 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -216,7 +216,7 @@ public class McpAsyncClient { asyncResourcesChangeNotificationHandler(resourcesChangeConsumersFinal)); // Resources Update Notification - List>> resourcesUpdateConsumersFinal = new ArrayList<>(); + List, Mono>> resourcesUpdateConsumersFinal = new ArrayList<>(); resourcesUpdateConsumersFinal .add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources updated: {}", notification))); @@ -721,18 +721,20 @@ private NotificationHandler asyncResourcesChangeNotificationHandler( } private NotificationHandler asyncResourcesUpdatedNotificationHandler( - List>> resourcesUpdateConsumers) { + List, Mono>> resourcesUpdateConsumers) { return params -> { - McpSchema.Resource updatedResource = transport.unmarshalFrom(params, new TypeReference<>() { - }); + McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification = transport.unmarshalFrom(params, + new TypeReference<>() { + }); - return Flux.fromIterable(resourcesUpdateConsumers) - .flatMap(consumer -> consumer.apply(updatedResource)) - .onErrorResume(error -> { - logger.error("Error handling resource update notification", error); - return Mono.empty(); - }) - .then(); + return readResource(new McpSchema.ReadResourceRequest(resourcesUpdatedNotification.uri())) + .flatMap(readResourceResult -> Flux.fromIterable(resourcesUpdateConsumers) + .flatMap(consumer -> consumer.apply(readResourceResult.contents())) + .onErrorResume(error -> { + logger.error("Error handling resource update notification", error); + return Mono.empty(); + }) + .then()); }; } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java index a945e6a9..3c5b9f8a 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java @@ -169,7 +169,7 @@ class SyncSpec { private final List>> resourcesChangeConsumers = new ArrayList<>(); - private final List> resourcesUpdateConsumers = new ArrayList<>(); + private final List>> resourcesUpdateConsumers = new ArrayList<>(); private final List>> promptsChangeConsumers = new ArrayList<>(); @@ -410,7 +410,7 @@ class AsyncSpec { private final List, Mono>> resourcesChangeConsumers = new ArrayList<>(); - private final List>> resourcesUpdateConsumers = new ArrayList<>(); + private final List, Mono>> resourcesUpdateConsumers = new ArrayList<>(); private final List, Mono>> promptsChangeConsumers = new ArrayList<>(); @@ -567,7 +567,8 @@ public AsyncSpec resourcesChangeConsumer( * @return This builder instance for method chaining. * @throws IllegalArgumentException If the resourcesUpdateConsumer is null. */ - public AsyncSpec resourcesUpdateConsumer(Function> resourcesUpdateConsumer) { + public AsyncSpec resourcesUpdateConsumer( + Function, Mono> resourcesUpdateConsumer) { Assert.notNull(resourcesUpdateConsumer, "Resources update consumer must not be null"); this.resourcesUpdateConsumers.add(resourcesUpdateConsumer); return this; diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java index ab328f36..170915a9 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java @@ -64,7 +64,7 @@ class McpClientFeatures { record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, Map roots, List, Mono>> toolsChangeConsumers, List, Mono>> resourcesChangeConsumers, - List>> resourcesUpdateConsumers, + List, Mono>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, Function> samplingHandler) { @@ -83,7 +83,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c Map roots, List, Mono>> toolsChangeConsumers, List, Mono>> resourcesChangeConsumers, - List>> resourcesUpdateConsumers, + List, Mono>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, Function> samplingHandler) { @@ -125,8 +125,8 @@ public static Async fromSync(Sync syncSpec) { .subscribeOn(Schedulers.boundedElastic())); } - List>> resourcesUpdateConsumers = new ArrayList<>(); - for (Consumer consumer : syncSpec.resourcesUpdateConsumers()) { + List, Mono>> resourcesUpdateConsumers = new ArrayList<>(); + for (Consumer> consumer : syncSpec.resourcesUpdateConsumers()) { resourcesUpdateConsumers.add(r -> Mono.fromRunnable(() -> consumer.accept(r)) .subscribeOn(Schedulers.boundedElastic())); } @@ -168,7 +168,7 @@ public static Async fromSync(Sync syncSpec) { public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, Map roots, List>> toolsChangeConsumers, List>> resourcesChangeConsumers, - List> resourcesUpdateConsumers, + List>> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, Function samplingHandler) { @@ -188,7 +188,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, Map roots, List>> toolsChangeConsumers, List>> resourcesChangeConsumers, - List> resourcesUpdateConsumers, + List>> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, Function samplingHandler) { diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java index 1efa13de..02ad955b 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java @@ -430,6 +430,15 @@ public Mono notifyResourcesListChanged() { return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED, null); } + /** + * Notifies clients that the resources have updated. + * @return A Mono that completes when all clients have been notified + */ + public Mono notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification) { + return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, + resourcesUpdatedNotification); + } + private McpServerSession.RequestHandler resourcesListRequestHandler() { return (exchange, params) -> { var resourceList = this.resources.values() diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java index bf310450..91f8d9e4 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java @@ -141,6 +141,13 @@ public void notifyResourcesListChanged() { this.asyncServer.notifyResourcesListChanged().block(); } + /** + * Notify clients that the resources have updated. + */ + public void notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification) { + this.asyncServer.notifyResourcesUpdated(resourcesUpdatedNotification).block(); + } + /** * Notify clients that the list of available prompts has changed. */ diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java index 79187f42..ba9cc0f2 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java @@ -1113,6 +1113,17 @@ public record ProgressNotification(// @formatter:off @JsonProperty("total") Double total) { }// @formatter:on + /** + * The Model Context Protocol (MCP) provides a standardized way for servers to send + * resources update message to clients. + * + * @param uri The updated resource uri. + */ + @JsonIgnoreProperties(ignoreUnknown = true) + public record ResourcesUpdatedNotification(// @formatter:off + @JsonProperty("uri") String uri) { + }// @formatter:on + /** * The Model Context Protocol (MCP) provides a standardized way for servers to send * structured log messages to clients. Clients can control logging verbosity by diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java index df0b0c72..dd9f6589 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java @@ -193,6 +193,18 @@ void testNotifyResourcesListChanged() { assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException(); } + @Test + void testNotifyResourcesUpdated() { + var mcpAsyncServer = McpServer.async(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build(); + + StepVerifier + .create(mcpAsyncServer + .notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI))) + .verifyComplete(); + + assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException(); + } + @Test void testAddResource() { var mcpAsyncServer = McpServer.async(createMcpTransportProvider()) diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java index 0b38da85..6cbb8632 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java @@ -190,6 +190,17 @@ void testNotifyResourcesListChanged() { assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException(); } + @Test + void testNotifyResourcesUpdated() { + var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build(); + + assertThatCode(() -> mcpSyncServer + .notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI))) + .doesNotThrowAnyException(); + + assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException(); + } + @Test void testAddResource() { var mcpSyncServer = McpServer.sync(createMcpTransportProvider())