From 0652458a97380720943ad0f8e6ff882ada4ca005 Mon Sep 17 00:00:00 2001 From: Ken Liao Date: Wed, 25 Dec 2024 22:01:48 -0800 Subject: [PATCH] Add option for deleting all scheduled messages on startup --- .../apache/activemq/broker/BrokerService.java | 14 ++++ .../broker/scheduler/SchedulerBroker.java | 17 +++++ ...erDeleteAllMessageOnStartupOptionTest.java | 76 +++++++++++++++++++ .../scheduler/JobSchedulerTestSupport.java | 5 ++ .../apache/activemq/config/ConfigTest.java | 1 + .../org/apache/activemq/config/example.xml | 2 +- 6 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerDeleteAllMessageOnStartupOptionTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index b05c3ec66d8..13bd290a527 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -184,6 +184,7 @@ public class BrokerService implements Service { private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges // to other jms messaging systems private boolean deleteAllMessagesOnStartup; + private boolean deleteAllScheduledMessagesOnStartup = false; private boolean advisorySupport = true; private boolean anonymousProducerAdvisorySupport = false; private URI vmConnectorURI; @@ -1630,6 +1631,18 @@ public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStar this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; } + /** + * Sets whether all scheduled messages are deleted on startup + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" + */ + public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllScheduledMessagesOnStartup) { + this.deleteAllScheduledMessagesOnStartup = deleteAllScheduledMessagesOnStartup; + } + + public boolean isDeleteAllScheduledMessagesOnStartup() { + return deleteAllScheduledMessagesOnStartup; + } + public URI getVmConnectorURI() { if (vmConnectorURI == null) { try { @@ -2440,6 +2453,7 @@ protected Broker addInterceptors(Broker broker) throws Exception { if (isSchedulerSupport()) { SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed); + sb.setDeleteAllScheduledMessagesOnStartup(deleteAllScheduledMessagesOnStartup); if (isUseJmx()) { JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); try { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java index 3a778c5ad1c..220ae874fa0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java @@ -72,6 +72,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener { private final JobSchedulerStore store; private JobScheduler scheduler; private int maxRepeatAllowed = MAX_REPEAT_ALLOWED; + private boolean deleteAllScheduledMessagesOnStartup; public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception { super(next); @@ -212,6 +213,9 @@ public synchronized JobScheduler getJobScheduler() throws Exception { public void start() throws Exception { this.started.set(true); getInternalScheduler(); + if (deleteAllScheduledMessagesOnStartup) { + deleteAllScheduledMessages(); + } super.start(); } @@ -364,6 +368,11 @@ private void doSchedule(Message messageSend, Object cronValue, Object periodValu new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat); } + private void deleteAllScheduledMessages() throws Exception { + LOG.info("Deleting all scheduled messages on startup because deleteAllScheduledMessagesOnStartup configuration has been provided"); + getInternalScheduler().removeAllJobs(); + } + @Override public void scheduledJob(String id, ByteSequence job) { org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength()); @@ -487,4 +496,12 @@ public int getMaxRepeatAllowed() { public void setMaxRepeatAllowed(int maxRepeatAllowed) { this.maxRepeatAllowed = maxRepeatAllowed; } + + public boolean getDeleteAllScheduledMessagesOnStartup() { + return deleteAllScheduledMessagesOnStartup; + } + + public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllSchedulerdMessagesOnStartup) { + this.deleteAllScheduledMessagesOnStartup = deleteAllSchedulerdMessagesOnStartup; + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerDeleteAllMessageOnStartupOptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerDeleteAllMessageOnStartupOptionTest.java new file mode 100644 index 00000000000..2eaea7e5cf3 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerDeleteAllMessageOnStartupOptionTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.scheduler; + +import jakarta.jms.Connection; +import jakarta.jms.Message; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageListener; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; +import org.apache.activemq.ScheduledMessage; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class JmsSchedulerDeleteAllMessageOnStartupOptionTest extends JobSchedulerTestSupport { + + private static final transient Logger LOG = LoggerFactory.getLogger(JmsSchedulerDeleteAllMessageOnStartupOptionTest.class); + + @Override + protected boolean shouldDeleteAllScheduledMessagesOnStartup() throws Exception { + return true; + } + + @Test + public void testDeleteAllMessageOnRestart() throws Exception { + // Send a message delayed by 8 seconds + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + long time_ms = 10 * 1000; + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test msg"); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time_ms); + producer.send(message); + producer.close(); + // Shutdown broker + restartBroker(RestartType.NORMAL); + // Make sure the consumer won't get the message + connection = createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + final int COUNT = 1; + final CountDownLatch latch = new CountDownLatch(COUNT); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + latch.countDown(); + } + }); + latch.await(20, TimeUnit.SECONDS); + assertEquals(latch.getCount(), COUNT); + connection.close(); + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java index ed2ca444c37..7e4759aff3e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java @@ -116,6 +116,7 @@ protected BrokerService createBroker(boolean delete) throws Exception { answer.setSchedulerDirectoryFile(schedulerDirectory); answer.setSchedulerSupport(true); answer.setUseJmx(isUseJmx()); + answer.setDeleteAllScheduledMessagesOnStartup(shouldDeleteAllScheduledMessagesOnStartup()); return answer; } @@ -136,4 +137,8 @@ protected void restartBroker(RestartType restartType) throws Exception { broker.start(); broker.waitUntilStarted(); } + + protected boolean shouldDeleteAllScheduledMessagesOnStartup() throws Exception { + return false; + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java index 2c4e3279068..d0fae18a648 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java @@ -170,6 +170,7 @@ public void testBrokerConfig() throws Exception { assertEquals("Broker Config Error (persistent)", false, broker.isPersistent()); assertEquals("Broker Config Error (useShutdownHook)", false, broker.isUseShutdownHook()); assertEquals("Broker Config Error (deleteAllMessagesOnStartup)", true, broker.isDeleteAllMessagesOnStartup()); + assertEquals("Broker Config Error (deleteAllScheduledMessagesOnStartup)", true, broker.isDeleteAllScheduledMessagesOnStartup()); LOG.info("Success"); // Check specific vm transport diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/config/example.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/config/example.xml index 0a7a6864357..1903cc79a29 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/config/example.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/config/example.xml @@ -26,7 +26,7 @@ + useShutdownHook="false" deleteAllMessagesOnStartup="true" deleteAllScheduledMessagesOnStartup="true">