Skip to content

[AMQ-9593] Add option to clean scheduled messages on startup #1352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public class BrokerService implements Service {
private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
// to other jms messaging systems
private boolean deleteAllMessagesOnStartup;
private boolean deleteAllScheduledMessagesOnStartup = false;
Copy link
Contributor

@mattrpav mattrpav Nov 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduled message support is part of the JMS 3.1/2.0 spec. I'm not loving the separate config flag. We don't have a 'deleteAllDurableTopicMessagesOnStartup' flag. It is 'all' messages on startup.

I'm thinking it would be better to have a flag to have the scheduler ignore the deleteAllMessagesOnStartup.

Possible path forward:
v6.x - Scheduler ignores deleteAllMessagesOnStartup by default (to maintain existing behavior)
v7.x - Scheduler honors deleteAllMessagesOnStartup by default

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see where you are coming from. Initially I would expect deleteAllMessagesOnStartup will include scheduled messages. I even opened a Jira ticket on it. https://issues.apache.org/jira/projects/AMQ/issues/AMQ-9572?filter=allissues from @jbonofre feedback we should have a separated flag for scheduled messages.

Regarding the proposal in v7.x I think if we have the consensus that deleteAllMessagesOnStartup should include scheduled messages then it makes sense to me.

private boolean advisorySupport = true;
private boolean anonymousProducerAdvisorySupport = false;
private URI vmConnectorURI;
Expand Down Expand Up @@ -1630,6 +1631,18 @@ public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStar
this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
}

/**
* Sets whether all scheduled messages are deleted on startup
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
*/
public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllScheduledMessagesOnStartup) {
this.deleteAllScheduledMessagesOnStartup = deleteAllScheduledMessagesOnStartup;
}

public boolean isDeleteAllScheduledMessagesOnStartup() {
return deleteAllScheduledMessagesOnStartup;
}

public URI getVmConnectorURI() {
if (vmConnectorURI == null) {
try {
Expand Down Expand Up @@ -2440,6 +2453,7 @@ protected Broker addInterceptors(Broker broker) throws Exception {
if (isSchedulerSupport()) {
SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed);
sb.setDeleteAllScheduledMessagesOnStartup(deleteAllScheduledMessagesOnStartup);
if (isUseJmx()) {
JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
private final JobSchedulerStore store;
private JobScheduler scheduler;
private int maxRepeatAllowed = MAX_REPEAT_ALLOWED;
private boolean deleteAllScheduledMessagesOnStartup;

public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
super(next);
Expand Down Expand Up @@ -212,6 +213,9 @@ public synchronized JobScheduler getJobScheduler() throws Exception {
public void start() throws Exception {
this.started.set(true);
getInternalScheduler();
if (deleteAllScheduledMessagesOnStartup) {
deleteAllScheduledMessages();
}
super.start();
}

Expand Down Expand Up @@ -364,6 +368,11 @@ private void doSchedule(Message messageSend, Object cronValue, Object periodValu
new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat);
}

private void deleteAllScheduledMessages() throws Exception {
LOG.info("Deleting all scheduled messages on startup because deleteAllScheduledMessagesOnStartup configuration has been provided");
getInternalScheduler().removeAllJobs();
}

@Override
public void scheduledJob(String id, ByteSequence job) {
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());
Expand Down Expand Up @@ -487,4 +496,12 @@ public int getMaxRepeatAllowed() {
public void setMaxRepeatAllowed(int maxRepeatAllowed) {
this.maxRepeatAllowed = maxRepeatAllowed;
}

public boolean getDeleteAllScheduledMessagesOnStartup() {
return deleteAllScheduledMessagesOnStartup;
}

public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllSchedulerdMessagesOnStartup) {
this.deleteAllScheduledMessagesOnStartup = deleteAllSchedulerdMessagesOnStartup;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.broker.scheduler;

import jakarta.jms.Connection;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.activemq.ScheduledMessage;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class JmsSchedulerDeleteAllMessageOnStartupOptionTest extends JobSchedulerTestSupport {

private static final transient Logger LOG = LoggerFactory.getLogger(JmsSchedulerDeleteAllMessageOnStartupOptionTest.class);

@Override
protected boolean shouldDeleteAllScheduledMessagesOnStartup() throws Exception {
return true;
}

@Test
public void testDeleteAllMessageOnRestart() throws Exception {
// Send a message delayed by 8 seconds
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
long time_ms = 10 * 1000;
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time_ms);
producer.send(message);
producer.close();
// Shutdown broker
restartBroker(RestartType.NORMAL);
// Make sure the consumer won't get the message
connection = createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
final int COUNT = 1;
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
}
});
latch.await(20, TimeUnit.SECONDS);
assertEquals(latch.getCount(), COUNT);
connection.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ protected BrokerService createBroker(boolean delete) throws Exception {
answer.setSchedulerDirectoryFile(schedulerDirectory);
answer.setSchedulerSupport(true);
answer.setUseJmx(isUseJmx());
answer.setDeleteAllScheduledMessagesOnStartup(shouldDeleteAllScheduledMessagesOnStartup());
return answer;
}

Expand All @@ -136,4 +137,8 @@ protected void restartBroker(RestartType restartType) throws Exception {
broker.start();
broker.waitUntilStarted();
}

protected boolean shouldDeleteAllScheduledMessagesOnStartup() throws Exception {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public void testBrokerConfig() throws Exception {
assertEquals("Broker Config Error (persistent)", false, broker.isPersistent());
assertEquals("Broker Config Error (useShutdownHook)", false, broker.isUseShutdownHook());
assertEquals("Broker Config Error (deleteAllMessagesOnStartup)", true, broker.isDeleteAllMessagesOnStartup());
assertEquals("Broker Config Error (deleteAllScheduledMessagesOnStartup)", true, broker.isDeleteAllScheduledMessagesOnStartup());
LOG.info("Success");

// Check specific vm transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
useLoggingForShutdownErrors="true" useJmx="true"
persistent="false" vmConnectorURI="vm://javacoola"
useShutdownHook="false" deleteAllMessagesOnStartup="true">
useShutdownHook="false" deleteAllMessagesOnStartup="true" deleteAllScheduledMessagesOnStartup="true">

<!--
|| NOTE this config file is used for unit testing the configuration mechanism
Expand Down