diff --git a/client/pom.xml b/client/pom.xml new file mode 100644 index 000000000..668dc5030 --- /dev/null +++ b/client/pom.xml @@ -0,0 +1,30 @@ + + + 4.0.0 + + org.dromara.dynamictp + dynamic-tp-all + ${revision} + ../pom.xml + + dynamic-tp-client + + + + org.dromara.dynamictp + dynamic-tp-spring-boot-starter-common + + + + com.alipay.sofa + bolt + + + + com.alipay.sofa.common + sofa-common-tools + + + + diff --git a/client/src/main/java/org/dromara/dynamictp/client/AdminClient.java b/client/src/main/java/org/dromara/dynamictp/client/AdminClient.java new file mode 100644 index 000000000..081706854 --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/AdminClient.java @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client; + +import cn.hutool.core.lang.generator.SnowflakeGenerator; +import com.alipay.remoting.Connection; +import com.alipay.remoting.ConnectionEventType; +import com.alipay.remoting.config.Configs; +import com.alipay.remoting.exception.RemotingException; +import com.alipay.remoting.rpc.RpcClient; +import com.alipay.remoting.serialization.HessianSerializer; +import com.alipay.remoting.serialization.SerializerManager; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.client.selector.RandomAdminNodeSelector; +import org.dromara.dynamictp.client.selector.WeightedRoundRobinAdminNodeSelector; +import org.dromara.dynamictp.common.entity.AdminRequestBody; +import org.dromara.dynamictp.common.em.AdminRequestTypeEnum; +import org.dromara.dynamictp.client.processor.AdminClientUserProcessor; +import org.dromara.dynamictp.client.processor.AdminCloseEventProcessor; +import org.dromara.dynamictp.client.processor.AdminConnectEventProcessor; +import org.dromara.dynamictp.client.cluster.AdminClusterManager; +import org.dromara.dynamictp.client.node.AdminNode; +import org.dromara.dynamictp.client.selector.AdminNodeSelector; +import org.dromara.dynamictp.client.selector.RoundRobinAdminNodeSelector; +import org.springframework.beans.factory.annotation.Value; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * DynamicTp admin client + * + * @author eachann + */ +@Slf4j +public class AdminClient { + + @Value("${dynamictp.adminNodes:}") + private String adminNodes; + + @Value("${dynamictp.loadBalanceStrategy:roundRobin}") + private String loadBalanceStrategy; + + /** + * + * @param clientName the adminclient name + */ + @Setter + @Value("${dynamictp.clientName:${spring.application.name}}") + private String clientName; + + /** + * + * @param serviceName the adminclient service name + */ + @Setter + @Value("${dynamictp.serviceName:${spring.application.name}}") + private String serviceName; + + @Value("${dynamictp.adminEnabled:false}") + private Boolean adminEnabled; + + @Getter + private static final SnowflakeGenerator SNOWFLAKE_GENERATOR = new SnowflakeGenerator(); + + private final RpcClient client = new RpcClient(); + + @Getter + private static final HessianSerializer SERIALIZER = new HessianSerializer(); + + @Getter + @Setter + private static Connection connection; + + /** + * 集群管理器 + */ + private AdminClusterManager clusterManager; + + /** + * Connection state management + */ + private final AtomicBoolean isConnected = new AtomicBoolean(false); + private final AtomicInteger retryCount = new AtomicInteger(0); + private static final int MAX_RETRY_COUNT = 3; + private static final long RETRY_DELAY_MS = 1000; + + /** + * Heartbeat mechanism + */ + private static final long HEARTBEAT_INTERVAL_SECONDS = 30; + + private ScheduledExecutorService heartbeatExecutor; + + public AdminClient(AdminClientUserProcessor adminClientUserProcessor) { + this(adminClientUserProcessor, ""); + } + + public AdminClient(AdminClientUserProcessor adminClientUserProcessor, String clientName) { + this(adminClientUserProcessor, clientName, "", "", "", false); + } + + public AdminClient(AdminClientUserProcessor adminClientUserProcessor, String clientName, String serviceName, String adminNodes, String loadBalanceStrategy, Boolean adminEnabled) { + if (!clientName.isEmpty()) { + this.clientName = clientName; + } + if (!serviceName.isEmpty()) { + this.serviceName = serviceName; + } + if (!adminNodes.isEmpty()) { + this.adminNodes = adminNodes; + } + if (!loadBalanceStrategy.isEmpty()) { + this.loadBalanceStrategy = loadBalanceStrategy; + } + this.adminEnabled = adminEnabled; + + client.addConnectionEventProcessor(ConnectionEventType.CONNECT, new AdminConnectEventProcessor(this)); + client.addConnectionEventProcessor(ConnectionEventType.CLOSE, new AdminCloseEventProcessor(this)); + client.registerUserProcessor(adminClientUserProcessor); + client.enableReconnectSwitch(); + client.startup(); + SerializerManager.addSerializer(1, SERIALIZER); + System.setProperty(Configs.SERIALIZER, String.valueOf(SERIALIZER)); + } + + @PostConstruct + public void init() { + try { + initClusterManager(); + createConnection(); + startHeartbeat(); + } catch (Exception e) { + log.error("Failed to initialize AdminClient", e); + // 如果初始化失败,不要启动心跳,避免持续重试 + if (heartbeatExecutor != null) { + stopHeartbeat(); + } + throw new RuntimeException("AdminClient initialization failed", e); + } + } + + /** + * 初始化集群管理器 + */ + private void initClusterManager() { + // 创建节点选择器 + AdminNodeSelector selector = createNodeSelector(); + + // 创建集群管理器 + clusterManager = new AdminClusterManager(selector); + + // 添加配置的节点 + addConfiguredNodes(); + + log.info("Admin cluster manager initialized with {} nodes", clusterManager.getAllNodes().size()); + } + + /** + * 创建节点选择器 + */ + private AdminNodeSelector createNodeSelector() { + switch (loadBalanceStrategy.toLowerCase()) { + case "random": + return new RandomAdminNodeSelector(); + case "weighted": + return new WeightedRoundRobinAdminNodeSelector(); + case "roundRobin": + default: + return new RoundRobinAdminNodeSelector(); + } + } + + /** + * 添加配置的节点 + */ + private void addConfiguredNodes() { + log.info("Configuring admin nodes: adminNodes={}", adminNodes); + + if (adminNodes == null || adminNodes.trim().isEmpty()) { + log.error("No admin nodes configured. Please configure dynamictp.adminNodes property."); + throw new IllegalStateException("No admin nodes configured"); + } + + String[] nodeConfigs = adminNodes.split(","); + log.info("Parsed {} node configurations", nodeConfigs.length); + + for (String nodeConfig : nodeConfigs) { + String trimmedConfig = nodeConfig.trim(); + if (trimmedConfig.isEmpty()) { + log.warn("Skipping empty node configuration"); + continue; + } + + String[] parts = trimmedConfig.split(":"); + if (parts.length >= 2) { + try { + String ip = parts[0].trim(); + int port = Integer.parseInt(parts[1].trim()); + int weight = parts.length > 2 ? Integer.parseInt(parts[2].trim()) : 1; + + if (ip.isEmpty()) { + log.error("Invalid IP address in node configuration: {}", trimmedConfig); + throw new IllegalArgumentException("Invalid IP address in node configuration: " + trimmedConfig); + } + + clusterManager.addNode(ip, port, weight); + log.info("Added admin node: {}:{} with weight {}", ip, port, weight); + } catch (NumberFormatException e) { + log.error("Invalid admin node configuration: {}", trimmedConfig, e); + throw new IllegalArgumentException("Invalid admin node configuration: " + trimmedConfig, e); + } + } else { + log.error("Invalid admin node configuration format: {}", trimmedConfig); + throw new IllegalArgumentException("Invalid admin node configuration format: " + trimmedConfig); + } + } + + if (clusterManager.getAllNodes().isEmpty()) { + log.error("No valid admin nodes found in configuration"); + throw new IllegalStateException("No valid admin nodes found in configuration"); + } + + log.info("Successfully configured {} admin nodes", clusterManager.getAllNodes().size()); + } + + /** + * Start heartbeat mechanism if admin is enabled + */ + private void startHeartbeat() { + if (adminEnabled) { + heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread thread = new Thread(r, "DynamicTp-AdminClient-Heartbeat"); + thread.setDaemon(true); + return thread; + }); + + heartbeatExecutor.scheduleAtFixedRate(() -> { + try { + ensureConnection(); + } catch (Exception e) { + log.warn("DynamicTp admin client heartbeat execution failed", e); + } + }, HEARTBEAT_INTERVAL_SECONDS, HEARTBEAT_INTERVAL_SECONDS, TimeUnit.SECONDS); + } + } + + /** + * Stop heartbeat mechanism + */ + private void stopHeartbeat() { + if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) { + try { + heartbeatExecutor.shutdown(); + if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + heartbeatExecutor.shutdownNow(); + } + log.info("DynamicTp admin client heartbeat stopped"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + heartbeatExecutor.shutdownNow(); + log.warn("DynamicTp admin client heartbeat shutdown interrupted"); + } + } + } + + public Object requestToServer(AdminRequestTypeEnum requestType) { + if (!ensureConnection()) { + log.error("DynamicTp admin client cannot establish connection after retries"); + return null; + } + AdminRequestBody requestBody = new AdminRequestBody(SNOWFLAKE_GENERATOR.next(), requestType); + requestBody.setAttributes("clientName", clientName); + requestBody.setAttributes("serviceName", serviceName); + Object object = null; + try { + object = client.invokeSync(connection, requestBody, 30000); + // 标记当前节点成功 + AdminNode currentNode = getCurrentNode(); + if (currentNode != null) { + clusterManager.markNodeSuccess(currentNode); + } + } catch (RemotingException | InterruptedException e) { + log.warn("DynamicTp admin client invoke failed, exception:", e); + // 标记当前节点失败 + AdminNode currentNode = getCurrentNode(); + if (currentNode != null) { + clusterManager.markNodeFailed(currentNode); + } + isConnected.set(false); + } + if (object instanceof IllegalStateException) { + log.error(((IllegalStateException) object).getMessage()); + return null; + } + return object; + } + + public Object requestToServer(AdminRequestBody requestBody) { + if (!ensureConnection()) { + log.error("DynamicTp admin client cannot establish connection after retries"); + return null; + } + requestBody.setAttributes("clientName", clientName); + requestBody.setAttributes("serviceName", serviceName); + Object object = null; + try { + object = client.invokeSync(connection, requestBody, 5000); + // 标记当前节点成功 + AdminNode currentNode = getCurrentNode(); + if (currentNode != null) { + clusterManager.markNodeSuccess(currentNode); + } + } catch (RemotingException | InterruptedException e) { + log.warn("DynamicTp admin client invoke failed, exception:", e); + // 标记当前节点失败 + AdminNode currentNode = getCurrentNode(); + if (currentNode != null) { + clusterManager.markNodeFailed(currentNode); + } + // Mark connection as disconnected when request fails + isConnected.set(false); + } + if (object instanceof IllegalStateException) { + log.error(((IllegalStateException) object).getMessage()); + return null; + } + return object; + } + + /** + * 获取当前连接的节点 + */ + private AdminNode getCurrentNode() { + if (connection != null) { + String address = connection.getRemoteAddress().getAddress().getHostAddress(); + int port = connection.getRemoteAddress().getPort(); + + for (AdminNode node : clusterManager.getAllNodes()) { + if (node.getIp().equals(address) && node.getPort() == port) { + return node; + } + } + } + return null; + } + + /** + * Ensure connection is available, try to reconnect if not available + * + * @return whether connection is available + */ + private boolean ensureConnection() { + // Check connection status + if (isConnected.get() && connection != null && connection.isFine()) { + // send health message to server + AdminRequestBody requestBody = new AdminRequestBody(SNOWFLAKE_GENERATOR.next(), AdminRequestTypeEnum.EXECUTOR_MONITOR); + requestBody.setAttributes("clientName", clientName); + requestBody.setAttributes("serviceName", serviceName); + try { + client.invokeSync(connection, requestBody, 30000); + } catch (RemotingException | InterruptedException e) { + log.warn("DynamicTp admin client invoke failed, exception:", e); + // 标记当前节点失败 + AdminNode currentNode = getCurrentNode(); + if (currentNode != null) { + clusterManager.markNodeFailed(currentNode); + } + isConnected.set(false); + } + return true; + } + + // Connection not available, try to reconnect + return reconnectWithRetry(); + } + + /** + * Reconnect with retry mechanism + * + * @return whether reconnection is successful + */ + private boolean reconnectWithRetry() { + int currentRetry = 0; + while (currentRetry < MAX_RETRY_COUNT) { + log.info("DynamicTp admin client attempting to reconnect, attempt: {}/{}", currentRetry + 1, + MAX_RETRY_COUNT); + + if (createConnection()) { + isConnected.set(true); + retryCount.set(0); + log.info("DynamicTp admin client reconnected successfully"); + return true; + } + + currentRetry++; + retryCount.incrementAndGet(); + + if (currentRetry < MAX_RETRY_COUNT) { + try { + // Incremental delay + Thread.sleep(RETRY_DELAY_MS * currentRetry); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("DynamicTp admin client reconnection interrupted"); + return false; + } + } + } + + log.error("DynamicTp admin client failed to reconnect after {} attempts", MAX_RETRY_COUNT); + return false; + } + + private boolean createConnection() { + try { + // 检查集群管理器是否已初始化 + if (clusterManager == null) { + log.error("Cluster manager not initialized"); + return false; + } + + // Clean up old connection + if (connection != null) { + try { + client.closeStandaloneConnection(connection); + } catch (Exception e) { + log.debug("Error closing old connection", e); + } + connection = null; + } + + AdminNode selectedNode = clusterManager.selectNode(null); + if (selectedNode == null) { + log.error("No available admin nodes for connection"); + return false; + } + + connection = client.createStandaloneConnection(selectedNode.getAddress(), 30000); + if (connection != null && connection.isFine()) { + log.info("DynamicTp admin client connection created successfully, admin node: {}", + selectedNode.getAddress()); + AdminRequestBody requestBody = new AdminRequestBody(SNOWFLAKE_GENERATOR.next(), + AdminRequestTypeEnum.EXECUTOR_REFRESH); + requestBody.setAttributes("clientName", clientName); + requestBody.setAttributes("serviceName", serviceName); + Object object = client.invokeSync(connection, requestBody, 5000); + if (object instanceof IllegalStateException) { + log.error(((IllegalStateException) object).getMessage()); + client.closeStandaloneConnection(connection); + return false; + } + return true; + } else { + log.warn("DynamicTp admin client connection created but not fine, admin node: {}", + selectedNode.getAddress()); + return false; + } + } catch (RemotingException | InterruptedException e) { + log.error("DynamicTp admin create connection failed", e); + return false; + } + } + + /** + * Get current connection status + * + * @return whether connection is available + */ + public boolean isConnected() { + return isConnected.get() && connection != null && connection.isFine(); + } + + /** + * Get retry count + * + * @return retry count + */ + public int getRetryCount() { + return retryCount.get(); + } + + /** + * Update connection status + * + * @param connected whether connected + */ + public void updateConnectionStatus(boolean connected) { + isConnected.set(connected); + if (!connected) { + retryCount.incrementAndGet(); + } + } + + /** + * 获取集群管理器 + * @return clusterManager + */ + public AdminClusterManager getClusterManager() { + return clusterManager; + } + + /** + * 获取所有admin节点 + * @return allAdminNodes + */ + public List getAllAdminNodes() { + return clusterManager != null ? clusterManager.getAllNodes() : null; + } + + /** + * 获取健康的admin节点 + * @return healthyAdminNodes + */ + public List getHealthyAdminNodes() { + return clusterManager != null ? clusterManager.getHealthyNodes() : null; + } + + @PreDestroy + public void close() { + stopHeartbeat(); + if (clusterManager != null) { + clusterManager.shutdown(); + } + isConnected.set(false); + if (connection != null) { + try { + client.closeStandaloneConnection(connection); + } catch (Exception e) { + log.warn("Error closing connection", e); + } + connection = null; + } + client.shutdown(); + } +} diff --git a/client/src/main/java/org/dromara/dynamictp/client/autoconfigure/AdminAutoConfiguration.java b/client/src/main/java/org/dromara/dynamictp/client/autoconfigure/AdminAutoConfiguration.java new file mode 100644 index 000000000..2e0bd5521 --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/autoconfigure/AdminAutoConfiguration.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.autoconfigure; + +import org.dromara.dynamictp.common.properties.DtpProperties; +import org.dromara.dynamictp.client.AdminClient; +import org.dromara.dynamictp.client.handler.refresh.AdminRefresher; +import org.dromara.dynamictp.client.processor.AdminClientUserProcessor; +import org.dromara.dynamictp.client.processor.AdminCloseEventProcessor; +import org.dromara.dynamictp.client.processor.AdminConnectEventProcessor; +import org.dromara.dynamictp.spring.DtpBaseBeanConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * AdminAutoConfiguration related + * + * @author eachann + */ +@Configuration +@ConditionalOnBean({ DtpBaseBeanConfiguration.class }) +@AutoConfigureAfter({ DtpBaseBeanConfiguration.class }) +public class AdminAutoConfiguration { + + @Bean + public AdminClient adminClient(AdminClientUserProcessor adminClientUserProcessor) { + return new AdminClient(adminClientUserProcessor); + } + + @Bean + @ConditionalOnMissingBean + public AdminRefresher adminRefresher(DtpProperties dtpProperties) { + return new AdminRefresher(dtpProperties); + } + + @Bean + public AdminClientUserProcessor adminClientUserProcessor(AdminRefresher adminRefresher) { + return new AdminClientUserProcessor(adminRefresher); + } + + @Bean + public AdminConnectEventProcessor adminConnectEventProcessor(AdminClient adminClient) { + return new AdminConnectEventProcessor(adminClient); + } + + @Bean + public AdminCloseEventProcessor adminCloseEventProcessor(AdminClient adminClient) { + return new AdminCloseEventProcessor(adminClient); + } + +} diff --git a/client/src/main/java/org/dromara/dynamictp/client/autoconfigure/AdminConfigEnvironmentProcessor.java b/client/src/main/java/org/dromara/dynamictp/client/autoconfigure/AdminConfigEnvironmentProcessor.java new file mode 100644 index 000000000..d95987010 --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/autoconfigure/AdminConfigEnvironmentProcessor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.autoconfigure; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.common.em.AdminRequestTypeEnum; +import org.dromara.dynamictp.common.properties.DtpProperties; +import org.dromara.dynamictp.core.support.binder.BinderHelper; +import org.dromara.dynamictp.client.AdminClient; +import org.dromara.dynamictp.client.processor.AdminClientUserProcessor; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.env.EnvironmentPostProcessor; +import org.springframework.boot.env.OriginTrackedMapPropertySource; +import org.springframework.core.Ordered; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.MutablePropertySources; + +import java.util.Map; + +/** + * ZkConfigEnvironmentProcessor related + * + * @author yanhom + * @since 1.0.4 + **/ +@Slf4j +public class AdminConfigEnvironmentProcessor implements EnvironmentPostProcessor, Ordered { + + public AdminConfigEnvironmentProcessor() { + } + + public static final String ADMIN_PROPERTY_SOURCE_NAME = "dtpAdminPropertySource"; + + @Override + public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { + // 先绑定配置属性 + DtpProperties dtpProperties = DtpProperties.getInstance(); + BinderHelper.bindDtpProperties(environment, dtpProperties); + + // 从 Environment 中直接获取 clientName 配置 + String clientName = environment.getProperty("dynamictp.clientName", + environment.getProperty("spring.application.name")); + String serviceName = environment.getProperty("dynamictp.serviceName", + environment.getProperty("spring.application.name")); + String adminNodes = environment.getProperty("dynamictp.adminNodes"); + String loadBalanceStrategy = environment.getProperty("dynamictp.loadBalanceStrategy", "roundRobin"); + Boolean adminEnabled = Boolean.parseBoolean(environment.getProperty("dynamictp.adminEnabled", "false")); + + // 创建 AdminClient 时传入配置的 clientName + AdminClient adminClient = new AdminClient(new AdminClientUserProcessor(), clientName, serviceName, adminNodes, loadBalanceStrategy, adminEnabled); + adminClient.init(); + Map properties = (Map) adminClient + .requestToServer(AdminRequestTypeEnum.EXECUTOR_REFRESH); + if (!checkPropertyExist(environment) && properties != null) { + createAdminPropertySource(environment, properties); + } + adminClient.close(); + } + + private boolean checkPropertyExist(ConfigurableEnvironment environment) { + MutablePropertySources propertySources = environment.getPropertySources(); + return propertySources.stream().anyMatch(p -> ADMIN_PROPERTY_SOURCE_NAME.equals(p.getName())); + } + + private void createAdminPropertySource(ConfigurableEnvironment environment, Map properties) { + MutablePropertySources propertySources = environment.getPropertySources(); + OriginTrackedMapPropertySource adminSource = new OriginTrackedMapPropertySource(ADMIN_PROPERTY_SOURCE_NAME, + properties); + propertySources.addLast(adminSource); + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } +} diff --git a/client/src/main/java/org/dromara/dynamictp/client/cluster/AdminClusterManager.java b/client/src/main/java/org/dromara/dynamictp/client/cluster/AdminClusterManager.java new file mode 100644 index 000000000..4ea6bfdfb --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/cluster/AdminClusterManager.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.cluster; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.client.node.AdminNode; +import org.dromara.dynamictp.client.selector.AdminNodeSelector; +import org.dromara.dynamictp.client.selector.RoundRobinAdminNodeSelector; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Admin集群管理器 + * + * @author eachann + * @since 1.2.3 + */ +@Slf4j +public class AdminClusterManager { + + private final List adminNodes = new CopyOnWriteArrayList<>(); + private final AdminNodeSelector nodeSelector; + private final ScheduledExecutorService healthCheckExecutor; + + private static final long HEALTH_CHECK_INTERVAL = 30000; // 30秒 + private static final int MAX_FAIL_COUNT = 3; + + public AdminClusterManager() { + this(new RoundRobinAdminNodeSelector()); + } + + public AdminClusterManager(AdminNodeSelector nodeSelector) { + this.nodeSelector = nodeSelector; + this.healthCheckExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread thread = new Thread(r, "DynamicTp-AdminCluster-HealthCheck"); + thread.setDaemon(true); + return thread; + }); + + startHealthCheck(); + } + + /** + * 添加admin节点 + * + * @param ip 节点IP + * @param port 节点端口 + */ + public void addNode(String ip, int port) { + addNode(ip, port, 1); + } + + /** + * 添加admin节点 + * + * @param ip 节点IP + * @param port 节点端口 + * @param weight 节点权重 + */ + public void addNode(String ip, int port, int weight) { + AdminNode node = new AdminNode(ip, port, weight); + if (!adminNodes.contains(node)) { + adminNodes.add(node); + log.info("Added admin node: {}:{} with weight {}", ip, port, weight); + } + } + + /** + * 移除admin节点 + * + * @param ip 节点IP + * @param port 节点端口 + */ + public void removeNode(String ip, int port) { + adminNodes.removeIf(node -> node.getIp().equals(ip) && node.getPort() == port); + log.info("Removed admin node: {}:{}", ip, port); + } + + /** + * 选择admin节点 + * + * @param arg 选择参数 + * @return 选中的admin节点 + */ + public AdminNode selectNode(Object arg) { + if (adminNodes.isEmpty()) { + log.warn("No admin nodes available"); + return null; + } + + AdminNode selectedNode = nodeSelector.select(adminNodes); + if (selectedNode != null) { + log.debug("Selected admin node: {}", selectedNode.getAddress()); + } + return selectedNode; + } + + /** + * 获取所有admin节点 + * + * @return admin节点列表 + */ + public List getAllNodes() { + return new ArrayList<>(adminNodes); + } + + /** + * 获取健康的admin节点 + * + * @return 健康的admin节点列表 + */ + public List getHealthyNodes() { + return adminNodes.stream() + .filter(AdminNode::isAvailable) + .collect(java.util.stream.Collectors.toList()); + } + + /** + * 标记节点失败 + * + * @param node 失败的节点 + */ + public void markNodeFailed(AdminNode node) { + if (node != null) { + node.markFailed(); + log.warn("Marked admin node as failed: {}", node.getAddress()); + } + } + + /** + * 标记节点成功 + * + * @param node 成功的节点 + */ + public void markNodeSuccess(AdminNode node) { + if (node != null) { + node.markSuccess(); + log.debug("Marked admin node as success: {}", node.getAddress()); + } + } + + /** + * 启动健康检查 + */ + private void startHealthCheck() { + healthCheckExecutor.scheduleAtFixedRate(() -> { + try { + performHealthCheck(); + } catch (Exception e) { + log.warn("Health check execution failed", e); + } + }, HEALTH_CHECK_INTERVAL, HEALTH_CHECK_INTERVAL, TimeUnit.MILLISECONDS); + } + + /** + * 执行健康检查 + */ + private void performHealthCheck() { + for (AdminNode node : adminNodes) { + boolean healthy = node.isHealthy(MAX_FAIL_COUNT, HEALTH_CHECK_INTERVAL); + if (!healthy && node.isAvailable()) { + log.warn("Admin node {} is marked as unhealthy", node.getAddress()); + } + } + } + + /** + * 关闭集群管理器 + */ + public void shutdown() { + if (healthCheckExecutor != null && !healthCheckExecutor.isShutdown()) { + try { + healthCheckExecutor.shutdown(); + if (!healthCheckExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + healthCheckExecutor.shutdownNow(); + } + log.info("Admin cluster manager health check stopped"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + healthCheckExecutor.shutdownNow(); + log.warn("Admin cluster manager shutdown interrupted"); + } + } + } +} + diff --git a/client/src/main/java/org/dromara/dynamictp/client/handler/collector/AdminCollector.java b/client/src/main/java/org/dromara/dynamictp/client/handler/collector/AdminCollector.java new file mode 100644 index 000000000..d50aaa10c --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/handler/collector/AdminCollector.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.handler.collector; + +import org.dromara.dynamictp.common.em.CollectorTypeEnum; +import org.dromara.dynamictp.common.entity.ThreadPoolStats; +import org.dromara.dynamictp.core.monitor.collector.AbstractCollector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * AdminCollector for collecting thread pool statistics + * Enhanced to support active collection and multiple thread pools + * + * @author eachann + */ +public class AdminCollector extends AbstractCollector { + + private static final Map MULTI_POOL_STATS = new ConcurrentHashMap(); + + public static List getMultiPoolStats() { + return new ArrayList<>(MULTI_POOL_STATS.values()); + } + + @Override + public void collect(ThreadPoolStats poolStats) { + MULTI_POOL_STATS.put(poolStats.getPoolName(), poolStats); + } + + @Override + public String type() { + return CollectorTypeEnum.ADMIN.name().toLowerCase(); + } +} diff --git a/client/src/main/java/org/dromara/dynamictp/client/handler/refresh/AdminRefresher.java b/client/src/main/java/org/dromara/dynamictp/client/handler/refresh/AdminRefresher.java new file mode 100644 index 000000000..ed2713aac --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/handler/refresh/AdminRefresher.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.handler.refresh; + + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.common.properties.DtpProperties; +import org.dromara.dynamictp.spring.AbstractSpringRefresher; +import org.springframework.beans.factory.InitializingBean; + +import java.util.Map; + + +/** + * AdminRefresher related + * + * @author eachann + */ +@Slf4j +public class AdminRefresher extends AbstractSpringRefresher implements InitializingBean { + + public AdminRefresher(DtpProperties dtpProperties) { + super(dtpProperties); + } + + @Override + public void refresh(Map properties) { + log.info("Dynamic-tp adminRefresher refresh properties"); + super.refresh(properties); + } + + @Override + public void afterPropertiesSet() { + + } +} diff --git a/client/src/main/java/org/dromara/dynamictp/client/node/AdminNode.java b/client/src/main/java/org/dromara/dynamictp/client/node/AdminNode.java new file mode 100644 index 000000000..925d530f9 --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/node/AdminNode.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.node; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * Admin节点信息 + * + * @author eachann + * @since 1.2.3 + */ +@Data +@EqualsAndHashCode +public class AdminNode { + + /** + * 节点IP地址 + */ + private String ip; + + /** + * 节点端口 + */ + private int port; + + /** + * 节点权重,用于加权负载均衡 + */ + private int weight = 1; + + /** + * 节点是否可用 + */ + private boolean available = true; + + /** + * 节点最后心跳时间 + */ + private long lastHeartbeatTime; + + /** + * 节点连接失败次数 + */ + private int failCount = 0; + + public AdminNode(String ip, int port) { + this.ip = ip; + this.port = port; + } + + public AdminNode(String ip, int port, int weight) { + this.ip = ip; + this.port = port; + this.weight = weight; + } + + /** + * 获取节点地址字符串 + * + * @return 节点地址 + */ + public String getAddress() { + return ip + ":" + port; + } + + /** + * 标记节点失败 + */ + public void markFailed() { + this.failCount++; + this.lastHeartbeatTime = System.currentTimeMillis(); + } + + /** + * 标记节点成功 + */ + public void markSuccess() { + this.failCount = 0; + this.lastHeartbeatTime = System.currentTimeMillis(); + this.available = true; + } + + /** + * 检查节点是否健康 + * + * @param maxFailCount 最大失败次数 + * @param healthCheckInterval 健康检查间隔(毫秒) + * @return 是否健康 + */ + public boolean isHealthy(int maxFailCount, long healthCheckInterval) { + if (failCount >= maxFailCount) { + this.available = false; + return false; + } + + if (System.currentTimeMillis() - lastHeartbeatTime > healthCheckInterval) { + this.available = false; + return false; + } + + return this.available; + } +} + diff --git a/client/src/main/java/org/dromara/dynamictp/client/processor/AdminClientUserProcessor.java b/client/src/main/java/org/dromara/dynamictp/client/processor/AdminClientUserProcessor.java new file mode 100644 index 000000000..fc8177c39 --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/processor/AdminClientUserProcessor.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.processor; + +import com.alipay.remoting.BizContext; +import com.alipay.remoting.rpc.protocol.SyncUserProcessor; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.common.entity.AdminRequestBody; +import org.dromara.dynamictp.client.handler.collector.AdminCollector; +import org.dromara.dynamictp.client.handler.refresh.AdminRefresher; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * AdminClientUserProcessor related + * + * @author eachann + */ +@Slf4j +public class AdminClientUserProcessor extends SyncUserProcessor { + + private final ExecutorService executor; + + @Getter + private String remoteAddress = "NOT-CONNECT"; + + private AdminRefresher adminRefresher; + + /** + * + * @param clientName the adminclient name + */ + @Setter + @Value("${dynamictp.clientName:${spring.application.name}}") + private String clientName; + + /** + * + * @param serviceName the adminclient service name + */ + @Setter + @Value("${dynamictp.serviceName:${spring.application.name}}") + private String serviceName; + + + public AdminClientUserProcessor() { + this.executor = Executors.newSingleThreadExecutor(); + } + + @Autowired + public AdminClientUserProcessor(AdminRefresher adminRefresher) { + this.executor = Executors.newSingleThreadExecutor(); + this.adminRefresher = adminRefresher; + } + + @Override + public Object handleRequest(BizContext bizContext, AdminRequestBody adminRequestBody) { + log.info("DynamicTp admin request received:{}", adminRequestBody.getRequestType().getValue()); + if (bizContext.isRequestTimeout()) { + log.warn("DynamicTp admin request timeout:{}s", bizContext.getClientTimeout()); + } + adminRequestBody.setAttributes("clientName", clientName); + adminRequestBody.setAttributes("serviceName", serviceName); + this.remoteAddress = bizContext.getRemoteAddress(); + return doHandleRequest(adminRequestBody); + } + + private Object doHandleRequest(AdminRequestBody adminRequestBody) { + switch (adminRequestBody.getRequestType()) { + case EXECUTOR_MONITOR: + return handleExecutorMonitorRequest(adminRequestBody); + case EXECUTOR_REFRESH: + return handleExecutorRefreshRequest(adminRequestBody); + case ALARM_MANAGE: + return handleAlarmManageRequest(adminRequestBody); + case LOG_MANAGE: + return handleLogManageRequest(adminRequestBody); + default: + throw new IllegalArgumentException("DynamicTp admin request type " + + adminRequestBody.getRequestType().getValue() + " is not supported"); + } + } + + @Override + public String interest() { + return AdminRequestBody.class.getName(); + } + + @Override + public Executor getExecutor() { + return executor; + } + + private Object handleExecutorMonitorRequest(AdminRequestBody adminRequestBody) { + adminRequestBody.setBody(AdminCollector.getMultiPoolStats()); + return adminRequestBody; + } + + private Object handleExecutorRefreshRequest(AdminRequestBody adminRequestBody) { + Object properties = adminRequestBody.getBody(); + if (properties == null) { + log.error("DynamicTp admin executor refresh failed, properties is null"); + return null; + } + adminRefresher.refresh((Map) properties); + return adminRequestBody; + } + + private Object handleAlarmManageRequest(AdminRequestBody adminRequestBody) { + + return adminRequestBody; + } + + private Object handleLogManageRequest(AdminRequestBody adminRequestBody) { + + return adminRequestBody; + } +} diff --git a/client/src/main/java/org/dromara/dynamictp/client/processor/AdminCloseEventProcessor.java b/client/src/main/java/org/dromara/dynamictp/client/processor/AdminCloseEventProcessor.java new file mode 100644 index 000000000..5a172fc91 --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/processor/AdminCloseEventProcessor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.processor; + +import com.alipay.remoting.Connection; +import com.alipay.remoting.ConnectionEventProcessor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.client.AdminClient; + +/** + * AdminCloseEventProcessor related + * + * @author eachann + */ +@Slf4j +public class AdminCloseEventProcessor implements ConnectionEventProcessor { + + private final AdminClient adminClient; + + public AdminCloseEventProcessor(AdminClient adminClient) { + this.adminClient = adminClient; + } + + @Override + public void onEvent(String remoteAddress, Connection connection) { + log.info("DynamicTp admin client is disconnected, admin ip: {}, port: {}", connection.getRemoteAddress(), + connection.getRemotePort()); + // Clean up connection object and update status when connection is closed + AdminClient.setConnection(null); + adminClient.updateConnectionStatus(false); + } +} diff --git a/client/src/main/java/org/dromara/dynamictp/client/processor/AdminConnectEventProcessor.java b/client/src/main/java/org/dromara/dynamictp/client/processor/AdminConnectEventProcessor.java new file mode 100644 index 000000000..770f955ab --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/processor/AdminConnectEventProcessor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.processor; + +import com.alipay.remoting.Connection; +import com.alipay.remoting.ConnectionEventProcessor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.client.AdminClient; + +/** + * AdminConnectEventProcessor related + * + * @author eachann + */ +@Slf4j +public class AdminConnectEventProcessor implements ConnectionEventProcessor { + + private final AdminClient adminClient; + + public AdminConnectEventProcessor(AdminClient adminClient) { + this.adminClient = adminClient; + } + + @Override + public void onEvent(String remoteAddress, Connection connection) { + AdminClient.setConnection(connection); + adminClient.updateConnectionStatus(true); + log.info("DynamicTp admin client connected, admin address: {}", remoteAddress); + } +} diff --git a/client/src/main/java/org/dromara/dynamictp/client/selector/AdminNodeSelector.java b/client/src/main/java/org/dromara/dynamictp/client/selector/AdminNodeSelector.java new file mode 100644 index 000000000..cc9d8399d --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/selector/AdminNodeSelector.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.selector; + +import org.dromara.dynamictp.client.node.AdminNode; + +import java.util.List; + +/** + * Admin节点选择器接口,用于负载均衡选择admin节点 + * + * @author eachann + * @since 1.2.3 + */ +public interface AdminNodeSelector { + + /** + * 选择admin节点 + * + * @param nodes 可用的admin节点列表 + * @return 选中的admin节点 + */ + AdminNode select(List nodes); +} + diff --git a/client/src/main/java/org/dromara/dynamictp/client/selector/RandomAdminNodeSelector.java b/client/src/main/java/org/dromara/dynamictp/client/selector/RandomAdminNodeSelector.java new file mode 100644 index 000000000..683274f8d --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/selector/RandomAdminNodeSelector.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.selector; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.client.node.AdminNode; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +/** + * 随机选择器实现 + * + * @author eachann + * @since 1.2.3 + */ +@Slf4j +public class RandomAdminNodeSelector implements AdminNodeSelector { + + @Override + public AdminNode select(List nodes) { + if (nodes == null || nodes.isEmpty()) { + log.warn("No available admin nodes for selection"); + return null; + } + + // 过滤出健康的节点 + List healthyNodes = nodes.stream() + .filter(AdminNode::isAvailable) + .collect(Collectors.toList()); + + if (healthyNodes.isEmpty()) { + log.warn("No healthy admin nodes available, using all nodes"); + healthyNodes = nodes; + } + + int index = ThreadLocalRandom.current().nextInt(healthyNodes.size()); + AdminNode selectedNode = healthyNodes.get(index); + + log.debug("Random selector selected admin node: {}", selectedNode.getAddress()); + return selectedNode; + } +} + diff --git a/client/src/main/java/org/dromara/dynamictp/client/selector/RoundRobinAdminNodeSelector.java b/client/src/main/java/org/dromara/dynamictp/client/selector/RoundRobinAdminNodeSelector.java new file mode 100644 index 000000000..522de5a5f --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/selector/RoundRobinAdminNodeSelector.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.selector; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.client.node.AdminNode; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * 轮询选择器实现 + * + * @author eachann + * @since 1.2.3 + */ +@Slf4j +public class RoundRobinAdminNodeSelector implements AdminNodeSelector { + + private final AtomicInteger counter = new AtomicInteger(0); + + @Override + public AdminNode select(List nodes) { + if (nodes == null || nodes.isEmpty()) { + log.warn("No available admin nodes for selection"); + return null; + } + + // 过滤出健康的节点 + List healthyNodes = nodes.stream() + .filter(AdminNode::isAvailable) + .collect(Collectors.toList()); + + if (healthyNodes.isEmpty()) { + log.warn("No healthy admin nodes available, using all nodes"); + healthyNodes = nodes; + } + + int index = counter.getAndIncrement() % healthyNodes.size(); + AdminNode selectedNode = healthyNodes.get(index); + + log.debug("RoundRobin selector selected admin node: {}", selectedNode.getAddress()); + return selectedNode; + } +} diff --git a/client/src/main/java/org/dromara/dynamictp/client/selector/WeightedRoundRobinAdminNodeSelector.java b/client/src/main/java/org/dromara/dynamictp/client/selector/WeightedRoundRobinAdminNodeSelector.java new file mode 100644 index 000000000..1bab1c7dd --- /dev/null +++ b/client/src/main/java/org/dromara/dynamictp/client/selector/WeightedRoundRobinAdminNodeSelector.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.client.selector; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.client.node.AdminNode; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * 加权轮询选择器实现 + * + * @author eachann + * @since 1.2.3 + */ +@Slf4j +public class WeightedRoundRobinAdminNodeSelector implements AdminNodeSelector { + + private final AtomicInteger currentWeight = new AtomicInteger(0); + private final AtomicInteger currentIndex = new AtomicInteger(0); + + @Override + public AdminNode select(List nodes) { + if (nodes == null || nodes.isEmpty()) { + log.warn("No available admin nodes for selection"); + return null; + } + + // 过滤出健康的节点 + List healthyNodes = nodes.stream() + .filter(AdminNode::isAvailable) + .collect(Collectors.toList()); + + if (healthyNodes.isEmpty()) { + log.warn("No healthy admin nodes available, using all nodes"); + healthyNodes = nodes; + } + + if (healthyNodes.size() == 1) { + return healthyNodes.get(0); + } + + // 计算最大权重 + int maxWeight = healthyNodes.stream() + .mapToInt(AdminNode::getWeight) + .max() + .orElse(1); + + // 计算权重总和 + int weightSum = healthyNodes.stream() + .mapToInt(AdminNode::getWeight) + .sum(); + + AdminNode selectedNode = null; + while (selectedNode == null) { + int current = currentIndex.get(); + int weight = currentWeight.get(); + + if (weight == 0) { + weight = maxWeight; + currentWeight.set(weight); + } + + AdminNode node = healthyNodes.get(current); + if (node.getWeight() >= weight) { + selectedNode = node; + currentWeight.addAndGet(-weight); + } else { + currentWeight.addAndGet(-node.getWeight()); + } + + currentIndex.set((current + 1) % healthyNodes.size()); + } + + log.debug("WeightedRoundRobin selector selected admin node: {}", selectedNode.getAddress()); + return selectedNode; + } +} + diff --git a/client/src/main/resources/META-INF/services/org.dromara.dynamictp.core.monitor.collector.MetricsCollector b/client/src/main/resources/META-INF/services/org.dromara.dynamictp.core.monitor.collector.MetricsCollector new file mode 100644 index 000000000..f2b87fdc4 --- /dev/null +++ b/client/src/main/resources/META-INF/services/org.dromara.dynamictp.core.monitor.collector.MetricsCollector @@ -0,0 +1 @@ +org.dromara.dynamictp.client.handler.collector.AdminCollector \ No newline at end of file diff --git a/client/src/main/resources/META-INF/spring.factories b/client/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..a6fdd56fc --- /dev/null +++ b/client/src/main/resources/META-INF/spring.factories @@ -0,0 +1,4 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + org.dromara.dynamictp.client.autoconfigure.AdminAutoConfiguration +org.springframework.boot.env.EnvironmentPostProcessor=\ + org.dromara.dynamictp.client.autoconfigure.AdminConfigEnvironmentProcessor diff --git a/common/src/main/java/org/dromara/dynamictp/common/em/AdminRequestTypeEnum.java b/common/src/main/java/org/dromara/dynamictp/common/em/AdminRequestTypeEnum.java new file mode 100644 index 000000000..0187e0062 --- /dev/null +++ b/common/src/main/java/org/dromara/dynamictp/common/em/AdminRequestTypeEnum.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.common.em; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * Admin request type enum. + * + * @author eachann + */ +@Getter +@AllArgsConstructor +public enum AdminRequestTypeEnum { + + EXECUTOR_MONITOR("executor_monitor"), + + EXECUTOR_REFRESH("executor_refresh"), + + ALARM_MANAGE("alarm_manage"), + + LOG_MANAGE("log_manage"); + + private final String value; + + public static AdminRequestTypeEnum of(String value) { + for (AdminRequestTypeEnum adminRequestType : AdminRequestTypeEnum.values()) { + if (adminRequestType.value.equals(value)) { + return adminRequestType; + } + } + return null; + } +} diff --git a/common/src/main/java/org/dromara/dynamictp/common/em/CollectorTypeEnum.java b/common/src/main/java/org/dromara/dynamictp/common/em/CollectorTypeEnum.java index 7356cc52e..a742571c9 100644 --- a/common/src/main/java/org/dromara/dynamictp/common/em/CollectorTypeEnum.java +++ b/common/src/main/java/org/dromara/dynamictp/common/em/CollectorTypeEnum.java @@ -46,6 +46,11 @@ public enum CollectorTypeEnum { /** * JMX collect type. */ - JMX + JMX, + + /** + * ADMIN collect type. + */ + ADMIN } diff --git a/common/src/main/java/org/dromara/dynamictp/common/entity/AdminRequestBody.java b/common/src/main/java/org/dromara/dynamictp/common/entity/AdminRequestBody.java new file mode 100644 index 000000000..84eec0759 --- /dev/null +++ b/common/src/main/java/org/dromara/dynamictp/common/entity/AdminRequestBody.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.common.entity; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.common.em.AdminRequestTypeEnum; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * @author eachann + */ +@Slf4j +public class AdminRequestBody implements Serializable { + + private static final long serialVersionUID = -1288207208017808618L; + + @Getter + private final long id; + + @Getter + private final AdminRequestTypeEnum requestType; + + @Setter + @Getter + private Object body; + + @Getter + private Map attributes = new HashMap<>(); + + public AdminRequestBody(long id, AdminRequestTypeEnum requestType) { + this.id = id; + this.requestType = requestType; + } + + public AdminRequestBody(long id, AdminRequestTypeEnum requestType, Object body) { + this.id = id; + this.body = body; + this.requestType = requestType; + } + + public void setAttributes(String key, String value) { + this.attributes.put(key, value); + } +} diff --git a/common/src/main/java/org/dromara/dynamictp/common/entity/ThreadPoolStats.java b/common/src/main/java/org/dromara/dynamictp/common/entity/ThreadPoolStats.java index dc6c92ba7..39c124dc2 100644 --- a/common/src/main/java/org/dromara/dynamictp/common/entity/ThreadPoolStats.java +++ b/common/src/main/java/org/dromara/dynamictp/common/entity/ThreadPoolStats.java @@ -20,6 +20,8 @@ import lombok.Data; import lombok.EqualsAndHashCode; +import java.io.Serializable; + /** * ThreadPoolStats related * @@ -28,7 +30,7 @@ **/ @Data @EqualsAndHashCode(callSuper = true) -public class ThreadPoolStats extends Metrics { +public class ThreadPoolStats extends Metrics implements Serializable { /** * 线程池名字 diff --git a/dependencies/pom.xml b/dependencies/pom.xml index f29b7f59d..e56b727c0 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -12,7 +12,7 @@ https://github.com/yanhom1314/dynamic-tp - 1.2.2 + 1.2.3 UTF-8 1.18.24 @@ -36,6 +36,8 @@ 2.6.10 2.0.0 2.15.0.RELEASE + 1.6.12 + 1.4.0 1.5.18 1.2.0 3.14.9 @@ -184,6 +186,18 @@ ${rocketmq.version} + + com.alipay.sofa + bolt + ${sofa-bolt.version} + + + + com.alipay.sofa.common + sofa-common-tools + ${sofa-common-tools.version} + + com.aliyun.openservices ons-client @@ -375,6 +389,12 @@ ${revision} + + org.dromara.dynamictp + dynamic-tp-client + ${revision} + + org.dromara.dynamictp dynamic-tp-spring diff --git a/example/example-adapter/example-adapter-thrift/pom.xml b/example/example-adapter/example-adapter-thrift/pom.xml index 5dbd7694f..948992602 100644 --- a/example/example-adapter/example-adapter-thrift/pom.xml +++ b/example/example-adapter/example-adapter-thrift/pom.xml @@ -9,7 +9,6 @@ ../pom.xml dynamic-tp-example-adapter-thrift - true diff --git a/example/example-admin/pom.xml b/example/example-admin/pom.xml new file mode 100644 index 000000000..e97b88423 --- /dev/null +++ b/example/example-admin/pom.xml @@ -0,0 +1,41 @@ + + + 4.0.0 + + dynamic-tp-example + org.dromara.dynamictp + ${revision} + ../pom.xml + + dynamic-tp-example-admin + + + true + + + + + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.dromara.dynamictp + dynamic-tp-spring-boot-starter-adapter-webserver + ${revision} + + + + org.dromara.dynamictp + dynamic-tp-client + + + + + \ No newline at end of file diff --git a/example/example-admin/src/main/java/org/dromara/dynamictp/example/AdminExampleApplication.java b/example/example-admin/src/main/java/org/dromara/dynamictp/example/AdminExampleApplication.java new file mode 100644 index 000000000..de813439b --- /dev/null +++ b/example/example-admin/src/main/java/org/dromara/dynamictp/example/AdminExampleApplication.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example; + +import org.dromara.dynamictp.spring.annotation.EnableDynamicTp; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author Redick01 + */ +@SpringBootApplication +@EnableDynamicTp +public class AdminExampleApplication { + + public static void main(String[] args) { + SpringApplication.run(AdminExampleApplication.class, args); + } +} diff --git a/example/example-admin/src/main/java/org/dromara/dynamictp/example/collector/EsClient.java b/example/example-admin/src/main/java/org/dromara/dynamictp/example/collector/EsClient.java new file mode 100644 index 000000000..8a6f6fd95 --- /dev/null +++ b/example/example-admin/src/main/java/org/dromara/dynamictp/example/collector/EsClient.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example.collector; + +import lombok.extern.slf4j.Slf4j; + +/** + * EsClient related + * + * @author yanhom + * @since 1.1.0 + */ +@Slf4j +public class EsClient { + + public void save(String json) { + log.info("save to es, json: {}", json); + } +} diff --git a/example/example-admin/src/main/java/org/dromara/dynamictp/example/collector/EsCollector.java b/example/example-admin/src/main/java/org/dromara/dynamictp/example/collector/EsCollector.java new file mode 100644 index 000000000..0dd2c4334 --- /dev/null +++ b/example/example-admin/src/main/java/org/dromara/dynamictp/example/collector/EsCollector.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example.collector; + +import org.dromara.dynamictp.common.entity.ThreadPoolStats; +import org.dromara.dynamictp.common.util.JsonUtil; +import org.dromara.dynamictp.core.monitor.collector.AbstractCollector; + +/** + * EsCollector related + * + * @author yanhom + * @since 1.1.0 + */ +public class EsCollector extends AbstractCollector { + + private final EsClient esClient; + + public EsCollector() { + this.esClient = new EsClient(); + } + + @Override + public void collect(ThreadPoolStats poolStats) { + esClient.save(JsonUtil.toJson(poolStats)); + } + + @Override + public String type() { + return "es"; + } +} diff --git a/example/example-admin/src/main/java/org/dromara/dynamictp/example/config/ThreadPoolConfiguration.java b/example/example-admin/src/main/java/org/dromara/dynamictp/example/config/ThreadPoolConfiguration.java new file mode 100644 index 000000000..ff51f8d24 --- /dev/null +++ b/example/example-admin/src/main/java/org/dromara/dynamictp/example/config/ThreadPoolConfiguration.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example.config; + +import org.dromara.dynamictp.core.executor.DtpExecutor; +import org.dromara.dynamictp.core.executor.OrderedDtpExecutor; +import org.dromara.dynamictp.core.support.DynamicTp; +import org.dromara.dynamictp.core.support.ThreadPoolBuilder; +import org.dromara.dynamictp.core.support.ThreadPoolCreator; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.dromara.dynamictp.common.em.QueueTypeEnum.MEMORY_SAFE_LINKED_BLOCKING_QUEUE; +import static org.dromara.dynamictp.common.em.RejectedTypeEnum.CALLER_RUNS_POLICY; + +/** + * @author Redick01 + */ +@Configuration +public class ThreadPoolConfiguration { + + /** + * 通过{@link DynamicTp} 注解定义普通juc线程池,会享受到该框架增强能力,注解名称优先级高于方法名 + * + * @return 线程池实例 + */ + @DynamicTp("jucThreadPoolExecutor") + @Bean + public ThreadPoolExecutor jucThreadPoolExecutor() { + return (ThreadPoolExecutor) Executors.newFixedThreadPool(1); + } + + /** + * 通过{@link DynamicTp} 注解定义spring线程池,会享受到该框架增强能力,注解名称优先级高于方法名 + * + * @return 线程池实例 + */ + @DynamicTp("threadPoolTaskExecutor") + @Bean + public ThreadPoolTaskExecutor threadPoolTaskExecutor() { + return new ThreadPoolTaskExecutor(); + } + + /** + * 通过{@link ThreadPoolCreator} 快速创建一些简单配置的线程池,使用默认参数 + * tips: 建议直接在配置中心配置就行,不用@Bean声明 + * + * @return 线程池实例 + */ + @Bean + public DtpExecutor dtpExecutor0() { + return ThreadPoolCreator.createDynamicFast("dtpExecutor0"); + } + + /** + * 通过{@link ThreadPoolBuilder} 设置详细参数创建动态线程池 + * tips: 建议直接在配置中心配置就行,不用@Bean声明 + * @return 线程池实例 + */ + @Bean + public ThreadPoolExecutor dtpExecutor1() { + return ThreadPoolBuilder.newBuilder() + .threadPoolName("dtpExecutor1") + .threadFactory("test-dtp-common") + .corePoolSize(10) + .maximumPoolSize(15) + .keepAliveTime(40) + .timeUnit(TimeUnit.SECONDS) + .workQueue(MEMORY_SAFE_LINKED_BLOCKING_QUEUE.getName(), 2000) + .buildDynamic(); + } + + /** + * 通过{@link ThreadPoolBuilder} 设置详细参数创建动态线程池 + * eager,参考tomcat线程池设计,适用于处理io密集型任务场景,具体参数可以看代码注释 + * tips: 建议直接在配置中心配置就行,不用@Bean声明 + * @return 线程池实例 + */ + @Bean + public DtpExecutor eagerDtpExecutor() { + return ThreadPoolBuilder.newBuilder() + .threadPoolName("eagerDtpExecutor") + .threadFactory("test-eager") + .corePoolSize(2) + .maximumPoolSize(4) + .queueCapacity(2000) + .eager() + .buildDynamic(); + } + + /** + * 通过{@link ThreadPoolBuilder} 设置详细参数创建动态线程池 + * ordered,适用于处理有序任务场景,任务要实现Ordered接口,具体参数可以看代码注释 + * tips: 建议直接在配置中心配置就行,不用@Bean声明 + * @return 线程池实例 + */ + @Bean + public OrderedDtpExecutor orderedDtpExecutor() { + return ThreadPoolBuilder.newBuilder() + .threadPoolName("orderedDtpExecutor") + .threadFactory("test-ordered") + .corePoolSize(4) + .maximumPoolSize(4) + .queueCapacity(2000) + .buildOrdered(); + } + + /** + * 通过{@link ThreadPoolBuilder} 设置详细参数创建线程池 + * scheduled,适用于处理定时任务场景,具体参数可以看代码注释 + * tips: 建议直接在配置中心配置就行,不用@Bean声明 + * @return 线程池实例 + */ + @Bean + public ScheduledExecutorService scheduledDtpExecutor() { + return ThreadPoolBuilder.newBuilder() + .threadPoolName("scheduledDtpExecutor") + .corePoolSize(2) + .threadFactory("test-scheduled") + .rejectedExecutionHandler(CALLER_RUNS_POLICY.getName()) + .buildScheduled(); + } +} diff --git a/example/example-admin/src/main/java/org/dromara/dynamictp/example/controller/TestController.java b/example/example-admin/src/main/java/org/dromara/dynamictp/example/controller/TestController.java new file mode 100644 index 000000000..04d89aa6a --- /dev/null +++ b/example/example-admin/src/main/java/org/dromara/dynamictp/example/controller/TestController.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example.controller; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.example.service.TestService; +import org.dromara.dynamictp.common.em.AdminRequestTypeEnum; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author Redick01 + */ +@Slf4j +@RestController +@AllArgsConstructor +public class TestController { + + private final TestService testService; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @GetMapping("/dtp-nacos-example/testAdminClient") + public String testAdminClient() { + return toJson(testService.testAdminClient()); + } + + @GetMapping("/dtp-nacos-example/testAdminClient/{type}") + public String testAdminClientByType(@PathVariable("type") String type) { + AdminRequestTypeEnum requestType = AdminRequestTypeEnum.of(type); + if (requestType == null) { + return "unknown type: " + type; + } + return toJson(testService.testAdminClient(requestType)); + } + + @GetMapping("/dtp-nacos-example/testAdminClientAll") + public String testAdminClientAll() { + return toJson(testService.testAdminClientAll()); + } + + private String toJson(Object value) { + try { + return OBJECT_MAPPER.writeValueAsString(value); + } catch (JsonProcessingException e) { + return String.valueOf(value); + } + } + +} diff --git a/example/example-admin/src/main/java/org/dromara/dynamictp/example/notifier/SmsClient.java b/example/example-admin/src/main/java/org/dromara/dynamictp/example/notifier/SmsClient.java new file mode 100644 index 000000000..61c3a7b48 --- /dev/null +++ b/example/example-admin/src/main/java/org/dromara/dynamictp/example/notifier/SmsClient.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example.notifier; + +import lombok.extern.slf4j.Slf4j; + +/** + * SmsClient related + * + * @author yanhom + * @since 1.1.0 + */ +@Slf4j +public class SmsClient { + + public void send(String secret, String[] receivers, String content) { + log.info("send sms, secret: {}, receivers: {}, content: {}", secret, receivers, content); + } +} diff --git a/example/example-admin/src/main/java/org/dromara/dynamictp/example/notifier/SmsDtpNotifier.java b/example/example-admin/src/main/java/org/dromara/dynamictp/example/notifier/SmsDtpNotifier.java new file mode 100644 index 000000000..761568164 --- /dev/null +++ b/example/example-admin/src/main/java/org/dromara/dynamictp/example/notifier/SmsDtpNotifier.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example.notifier; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.dromara.dynamictp.core.notifier.AbstractDtpNotifier; +import org.slf4j.MDC; + +import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRACE_ID; +import static org.dromara.dynamictp.common.constant.DynamicTpConst.UNKNOWN; + +/** + * SmsDtpNotifier related + * + * @author yanhom + * @since 1.1.0 + */ +public class SmsDtpNotifier extends AbstractDtpNotifier { + + public SmsDtpNotifier() { + super(new SmsNotifier(new SmsClient())); + } + + @Override + public String platform() { + return "sms"; + } + + @Override + protected String getNoticeTemplate() { + return SmsNotifyConst.SMS_NOTICE_TEMPLATE; + } + + @Override + protected String getAlarmTemplate() { + return SmsNotifyConst.SMS_ALARM_TEMPLATE; + } + + @Override + protected Pair getColors() { + return null; + } + + @Override + protected String getTraceInfo() { + if (StringUtils.isBlank(MDC.get(TRACE_ID))) { + return UNKNOWN; + } + return "[跳转详情](" + getKibanaUrl(MDC.get(TRACE_ID)) + ")"; + } + + @Override + protected String getExtInfo() { + String extInfo = super.getExtInfo(); + String memoryMetrics = getMemoryMetrics(); + if (StringUtils.isBlank(extInfo)) { + return memoryMetrics; + } + return extInfo + "\n" + memoryMetrics; + } + + private String getKibanaUrl(String traceId) { + return "https://kibana.com/app/kibana#/discover?_g=()&_a=(columns:!(_source),index:'logstash-*',interval:auto,query:(language:lucene,query:'traceId:" + traceId + "'),sort:!('@timestamp',desc))"; + } + + private String getMemoryMetrics() { + int heapInit = 1024; + int heapUsed = 521; + int heapCommitted = 1000; + int heapMax = 1024; + return "MemoryMetrics{" + + "heapInit=" + heapInit + + ", heapUsed=" + heapUsed + + ", heapCommitted=" + heapCommitted + + ", heapMax=" + heapMax + + "}"; + } +} diff --git a/example/example-admin/src/main/java/org/dromara/dynamictp/example/notifier/SmsNotifier.java b/example/example-admin/src/main/java/org/dromara/dynamictp/example/notifier/SmsNotifier.java new file mode 100644 index 000000000..29c512124 --- /dev/null +++ b/example/example-admin/src/main/java/org/dromara/dynamictp/example/notifier/SmsNotifier.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example.notifier; + +import org.dromara.dynamictp.common.entity.NotifyPlatform; +import org.dromara.dynamictp.common.notifier.AbstractNotifier; + +/** + * SmsNotifier related + * + * @author yanhom + * @since 1.1.0 + */ +public class SmsNotifier extends AbstractNotifier { + + private final SmsClient smsClient; + + public SmsNotifier(SmsClient smsClient) { + this.smsClient = smsClient; + } + + @Override + public String platform() { + return "sms"; + } + + @Override + protected void send0(NotifyPlatform platform, String content) { + String[] receivers = platform.getReceivers().split(","); + smsClient.send(platform.getSecret(), receivers, content); + } +} diff --git a/example/example-admin/src/main/java/org/dromara/dynamictp/example/notifier/SmsNotifyConst.java b/example/example-admin/src/main/java/org/dromara/dynamictp/example/notifier/SmsNotifyConst.java new file mode 100644 index 000000000..c7807ffea --- /dev/null +++ b/example/example-admin/src/main/java/org/dromara/dynamictp/example/notifier/SmsNotifyConst.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example.notifier; + +/** + * SmsNotifyConst related + * + * @author yanhom + * @since 1.1.0 + */ +public class SmsNotifyConst { + + private SmsNotifyConst() { } + + public static final String SMS_ALARM_TEMPLATE = + "服务名称:%s \n" + + "实例信息:%s \n" + + "环境:%s \n" + + "线程池名称:%s \n" + + "报警项:%s \n" + + "报警阈值 / 当前值:%s \n" + + "核心线程数:%s \n" + + "最大线程数:%s \n" + + "当前线程数:%s \n" + + "活跃线程数:%s \n" + + "历史最大线程数:%s \n" + + "任务总数:%s \n" + + "执行完成任务数:%s \n" + + "等待执行任务数:%s \n" + + "队列类型:%s \n" + + "队列容量:%s \n" + + "队列任务数量:%s \n" + + "队列剩余容量:%s \n" + + "拒绝策略:%s \n" + + "总拒绝任务数量:%s \n" + + "总执行超时任务数量:%s \n" + + "总等待超时任务数量:%s \n" + + "上次报警时间:%s \n" + + "报警时间:%s \n" + + "接收人:@%s \n" + + "统计周期:%ss \n" + + "静默时长:%ss \n" + + "trace 信息:%s \n" + + "扩展信息:%s \n"; + + public static final String SMS_NOTICE_TEMPLATE = + "服务名称:%s \n" + + "实例信息:%s \n" + + "环境:%s \n" + + "线程池名称:%s \n" + + "核心线程数:%s => %s \n" + + "最大线程数:%s => %s \n" + + "允许核心线程超时:%s => %s \n" + + "线程存活时间:%ss => %ss \n" + + "队列类型:%s \n" + + "队列容量:%s => %s \n" + + "拒绝策略:%s => %s \n" + + "接收人:@%s \n" + + "通知时间:%s \n"; +} + diff --git a/example/example-admin/src/main/java/org/dromara/dynamictp/example/service/TestService.java b/example/example-admin/src/main/java/org/dromara/dynamictp/example/service/TestService.java new file mode 100644 index 000000000..263104cba --- /dev/null +++ b/example/example-admin/src/main/java/org/dromara/dynamictp/example/service/TestService.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example.service; + +import org.dromara.dynamictp.common.em.AdminRequestTypeEnum; +import java.util.Map; + +/** + * TestService related + * + * @author yanhom + * @since 1.1.0 + */ +public interface TestService { + + Object testAdminClient(); + + /** + * Invoke admin with the specified request type. + * + * @param type admin request type + */ + Object testAdminClient(AdminRequestTypeEnum type); + + /** + * Invoke admin with all available request types. + */ + Map testAdminClientAll(); + +} diff --git a/example/example-admin/src/main/java/org/dromara/dynamictp/example/service/impl/TestServiceImpl.java b/example/example-admin/src/main/java/org/dromara/dynamictp/example/service/impl/TestServiceImpl.java new file mode 100644 index 000000000..b72fab7c4 --- /dev/null +++ b/example/example-admin/src/main/java/org/dromara/dynamictp/example/service/impl/TestServiceImpl.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example.service.impl; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.core.executor.DtpExecutor; +import org.dromara.dynamictp.core.executor.OrderedDtpExecutor; +import org.dromara.dynamictp.example.service.TestService; +import org.dromara.dynamictp.client.AdminClient; +import org.dromara.dynamictp.common.em.AdminRequestTypeEnum; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Service; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * TestServiceImpl related + * + * @author yanhom + * @since 1.1.0 + */ +@Slf4j +@Service +public class TestServiceImpl implements TestService { + + private final ThreadPoolExecutor jucThreadPoolExecutor; + + private final ThreadPoolTaskExecutor threadPoolTaskExecutor; + + private final DtpExecutor eagerDtpExecutor; + + private final ScheduledExecutorService scheduledDtpExecutor; + + private final OrderedDtpExecutor orderedDtpExecutor; + + private AdminClient adminClient; + + public TestServiceImpl(ThreadPoolExecutor jucThreadPoolExecutor, + ThreadPoolTaskExecutor threadPoolTaskExecutor, + DtpExecutor eagerDtpExecutor, + ScheduledExecutorService scheduledDtpExecutor, + OrderedDtpExecutor orderedDtpExecutor, + AdminClient adminClient) { + this.jucThreadPoolExecutor = jucThreadPoolExecutor; + this.threadPoolTaskExecutor = threadPoolTaskExecutor; + this.eagerDtpExecutor = eagerDtpExecutor; + this.scheduledDtpExecutor = scheduledDtpExecutor; + this.orderedDtpExecutor = orderedDtpExecutor; + this.adminClient = adminClient; + } + + @Override + public Object testAdminClient() { + Object resp = adminClient.requestToServer(AdminRequestTypeEnum.ALARM_MANAGE); + log.info("testAdminClient,remoteAddress:{}", adminClient.getConnection().getRemoteAddress()); + return resp; + } + + @Override + public Object testAdminClient(AdminRequestTypeEnum type) { + Object resp = adminClient.requestToServer(type); + log.info("testAdminClient type:{}, remoteAddress:{}", type, adminClient.getConnection().getRemoteAddress()); + return resp; + } + + @Override + public java.util.Map testAdminClientAll() { + java.util.Map result = new java.util.LinkedHashMap<>(); + for (AdminRequestTypeEnum type : AdminRequestTypeEnum.values()) { + Object resp = adminClient.requestToServer(type); + log.info("testAdminClient type:{}, remoteAddress:{}", type, adminClient.getConnection().getRemoteAddress()); + result.put(type.name(), resp); + } + return result; + } + +} diff --git a/example/example-admin/src/main/resources/application-cluster-example.yml b/example/example-admin/src/main/resources/application-cluster-example.yml new file mode 100644 index 000000000..c4325f1c4 --- /dev/null +++ b/example/example-admin/src/main/resources/application-cluster-example.yml @@ -0,0 +1,44 @@ +# DynamicTp Admin Client 集群和负载均衡配置示例 +# 将此配置添加到你的 application.yml 或 bootstrap.yml 中 + +dynamictp: + # 集群配置 - 必须配置多个admin节点 + # 格式:ip:port:weight,ip:port:weight + # weight为可选参数,默认为1 + adminNodes: > + 192.168.1.100:8989:2, + 192.168.1.101:8989:1, + 192.168.1.102:8989:3 + + # 负载均衡策略 + # 可选值:roundRobin(轮询,默认)、random(随机)、weighted(加权轮询) + loadBalanceStrategy: roundRobin + + # 其他配置 + clientName: ${spring.application.name} + adminEnabled: true +# 集群配置说明: +# 1. adminNodes: 必须配置多个admin节点,用逗号分隔 +# - 每个节点格式:ip:port:weight +# - weight为权重,数值越大被选中的概率越高 +# - 如果不指定weight,默认为1 +# - 至少需要配置2个节点才能形成集群 +# +# 2. loadBalanceStrategy: 负载均衡策略 +# - roundRobin: 轮询选择,按顺序选择节点 +# - random: 随机选择,每次随机选择一个可用节点 +# - weighted: 加权轮询,根据节点权重进行选择 +# +# 3. 故障转移:当某个节点不可用时,自动选择其他健康节点 +# 4. 健康检查:定期检查节点健康状态,自动标记不健康节点 +# 5. 权重支持:支持为不同节点设置不同权重,实现负载分配 +# +# 重要提示: +# - adminNodes 配置项是必须的,不能为空 +# - 至少需要配置2个节点才能启用集群功能 +# - 单个节点配置将导致启动失败 +# +# 示例场景: +# - 生产环境:配置多个admin节点,使用加权轮询 +# - 测试环境:配置2-3个节点,使用轮询策略 +# - 高可用:配置多个节点,自动故障转移 diff --git a/example/example-admin/src/main/resources/application.yml b/example/example-admin/src/main/resources/application.yml new file mode 100644 index 000000000..41d689573 --- /dev/null +++ b/example/example-admin/src/main/resources/application.yml @@ -0,0 +1,24 @@ +server: + port: 9100 + +spring: + application: + name: dynamic-tp-admin-demo + profiles: + active: dev + redis: + host: ${redis:localhost} + port: 6379 + +dynamictp: + adminEnabled: true + clientName: dynamic-tp-admin-demo + collector-types: admin + # 集群配置 - 必须配置多个admin节点 + # 格式:ip:port:weight,ip:port:weight + # weight为可选参数,默认为1 + adminNodes: > + 127.0.0.1:8989 + # 负载均衡策略 + # 可选值:roundRobin(轮询,默认)、random(随机)、weighted(加权轮询) + loadBalanceStrategy: roundRobin \ No newline at end of file diff --git a/example/pom.xml b/example/pom.xml index 75ea8f54b..fc23530bb 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -16,6 +16,7 @@ + example-admin example-nacos example-nacos-cloud example-apollo diff --git a/pom.xml b/pom.xml index 3520a6f68..b2938d5e5 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ https://github.com/yanhom1314/dynamic-tp - 1.2.2 + 1.2.3 8 8 @@ -39,6 +39,7 @@ adapter core common + client starter logging example @@ -242,19 +243,19 @@ - - org.apache.maven.plugins - maven-gpg-plugin - 1.6 - - - verify - - sign - - - - + + + + + + + + + + + + + org.sonatype.central diff --git a/spring/src/main/java/org/dromara/dynamictp/spring/DtpBaseBeanConfiguration.java b/spring/src/main/java/org/dromara/dynamictp/spring/DtpBaseBeanConfiguration.java index e3ea07088..d0d7c8302 100644 --- a/spring/src/main/java/org/dromara/dynamictp/spring/DtpBaseBeanConfiguration.java +++ b/spring/src/main/java/org/dromara/dynamictp/spring/DtpBaseBeanConfiguration.java @@ -74,4 +74,5 @@ public DtpLifecycleSpringAdapter dtpLifecycleSpringAdapter(LifeCycleManagement l public DtpApplicationListener dtpApplicationListener() { return new DtpApplicationListener(); } + } diff --git a/spring/src/main/java/org/dromara/dynamictp/spring/annotation/DtpBeanDefinitionRegistrar.java b/spring/src/main/java/org/dromara/dynamictp/spring/annotation/DtpBeanDefinitionRegistrar.java index d0d7c8c89..b4dc1491a 100644 --- a/spring/src/main/java/org/dromara/dynamictp/spring/annotation/DtpBeanDefinitionRegistrar.java +++ b/spring/src/main/java/org/dromara/dynamictp/spring/annotation/DtpBeanDefinitionRegistrar.java @@ -129,7 +129,8 @@ private Object[] buildConstructorArgs(Class clazz, DtpExecutorProps props) { if (clazz.equals(EagerDtpExecutor.class)) { taskQueue = new TaskQueue(props.getQueueCapacity()); } else if (clazz.equals(PriorityDtpExecutor.class)) { - taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator()); + taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), + PriorityDtpExecutor.getRunnableComparator()); } else { taskQueue = buildLbq(props.getQueueType(), props.getQueueCapacity(), @@ -137,7 +138,7 @@ private Object[] buildConstructorArgs(Class clazz, DtpExecutorProps props) { props.getMaxFreeMemory()); } - return new Object[]{ + return new Object[] { props.getCorePoolSize(), props.getMaximumPoolSize(), props.getKeepAliveTime(),