Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ private[kafka010] class KafkaMicroBatchStream(

private var allDataForTriggerAvailableNow: PartitionOffsetMap = _

private var isTriggerAvailableNow: Boolean = false

/**
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
Expand Down Expand Up @@ -124,8 +126,14 @@ private[kafka010] class KafkaMicroBatchStream(
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets

// Use the pre-fetched list of partition offsets when Trigger.AvailableNow is enabled.
latestPartitionOffsets = if (allDataForTriggerAvailableNow != null) {
allDataForTriggerAvailableNow
latestPartitionOffsets = if (isTriggerAvailableNow) {
if (allDataForTriggerAvailableNow != null) {
allDataForTriggerAvailableNow
} else {
allDataForTriggerAvailableNow =
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
allDataForTriggerAvailableNow
}
} else {
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
}
Expand Down Expand Up @@ -357,8 +365,7 @@ private[kafka010] class KafkaMicroBatchStream(
}

override def prepareForTriggerAvailableNow(): Unit = {
allDataForTriggerAvailableNow = kafkaOffsetReader.fetchLatestOffsets(
Some(getOrCreateInitialPartitionOffsets()))
isTriggerAvailableNow = true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ private[kafka010] class KafkaSource(

private var allDataForTriggerAvailableNow: PartitionOffsetMap = _

private var isTriggerAvailableNow = false

/**
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
Expand Down Expand Up @@ -171,8 +173,13 @@ private[kafka010] class KafkaSource(
val currentOffsets = currentPartitionOffsets.orElse(Some(initialPartitionOffsets))

// Use the pre-fetched list of partition offsets when Trigger.AvailableNow is enabled.
val latest = if (allDataForTriggerAvailableNow != null) {
allDataForTriggerAvailableNow
val latest = if (isTriggerAvailableNow) {
if (allDataForTriggerAvailableNow != null) {
allDataForTriggerAvailableNow
} else {
allDataForTriggerAvailableNow = kafkaReader.fetchLatestOffsets(currentOffsets)
allDataForTriggerAvailableNow
}
} else {
kafkaReader.fetchLatestOffsets(currentOffsets)
}
Expand Down Expand Up @@ -399,7 +406,7 @@ private[kafka010] class KafkaSource(
}

override def prepareForTriggerAvailableNow(): Unit = {
allDataForTriggerAvailableNow = kafkaReader.fetchLatestOffsets(Some(initialPartitionOffsets))
isTriggerAvailableNow = true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.matchers.should._
import org.scalatest.time.SpanSugar._

import org.apache.spark.TestUtils
import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
Expand Down Expand Up @@ -2056,6 +2056,44 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
"subscribePattern" -> s"$topicPrefix-.*")
}

test("SPARK-53560: no crash looping during uncommitted batch retry in AvailableNow trigger") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, (1 to 7).map(_.toString).toArray, Some(0))
def udfFailOn7(x: Int): Int = {
if (x == 7) throw new RuntimeException("error for 7")
x
}
val kafka =
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
.select(expr("CAST(CAST(value AS STRING) AS INT)").as("value"))
.as[Int]
.map(udfFailOn7)

withTempDir { dir =>
testStream(kafka)(
StartStream(Trigger.AvailableNow, checkpointLocation = dir.getAbsolutePath),
ExpectFailure[SparkException] { e =>
assert(e.getMessage.contains("error for 7"))
},
AssertOnQuery { q =>
testUtils.addPartitions(topic, 2)
!q.isActive
},
StartStream(Trigger.AvailableNow, checkpointLocation = dir.getAbsolutePath),
// Getting this error means the query has passed the planning stage, so
// verifyEndOffsetForTriggerAvailableNow succeeds.
ExpectFailure[SparkException] { e =>
assert(e.getMessage.contains("error for 7"))
}
)
}
}

private def testFromSpecificTimestampsWithNoMatchingStartingOffset(
topic: String,
options: (String, String)*): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ public interface SupportsTriggerAvailableNow extends SupportsAdmissionControl {
* the query). The source will behave as if there is no new data coming in after the target
* offset, i.e., the source will not return an offset higher than the target offset when
* {@link #latestOffset(Offset, ReadLimit) latestOffset} is called.
* <p>
* Note that there is an exception on the first uncommitted batch after a restart, where the end
* offset is not derived from the current latest offset. Sources need to take special
* considerations if wanting to assert such relation. One possible way is to have an internal
* flag in the source to indicate whether it is Trigger.AvailableNow, set the flag in this method,
* and record the target offset in the first call of
* {@link #latestOffset(Offset, ReadLimit) latestOffset}.
*/
void prepareForTriggerAvailableNow();
}