Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,6 @@ private async Task<EventProcessorCheckpoint> CreateLegacyCheckpoint(string fully
{
startingPosition ??= EventPosition.FromOffset(offset, false);
}
else if (sequenceNumber.HasValue && sequenceNumber.Value != long.MinValue)
{
startingPosition = EventPosition.FromSequenceNumber(sequenceNumber.Value, false);
}
else
{
// Skip checkpoints without an offset without logging an error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,13 +589,13 @@ public async Task GetCheckpointUsesOffsetAsTheStartingPositionWhenPresentInLegac
containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client =>
{
client.Content = Encoding.UTF8.GetBytes("{" +
"\"PartitionId\":\"0\"," +
"\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," +
"\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," +
"\"Epoch\":386," +
"\"Offset\":\"13\"," +
"\"SequenceNumber\":960180" +
"}");
"\"PartitionId\":\"0\"," +
"\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," +
"\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," +
"\"Epoch\":386," +
"\"Offset\":\"13\"," +
"\"SequenceNumber\":960180" +
"}");
});

var target = new BlobCheckpointStoreInternal(containerClient, initializeWithLegacyCheckpoints: true);
Expand All @@ -606,6 +606,44 @@ public async Task GetCheckpointUsesOffsetAsTheStartingPositionWhenPresentInLegac
Assert.That(checkpoint.PartitionId, Is.EqualTo("0"));
}

/// <summary>
/// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly.
/// </summary>
///
[Test]
[TestCase(null)]
[TestCase("")]
public async Task GetCheckpointLegacyCheckpointWithoutOffset(string offsetValue)
{
var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
"snapshot")
};

var offsetJsonValue = offsetValue is null ? "null" : $"\"{offsetValue}\"";
var containerClient = new MockBlobContainerClient() { Blobs = blobList };

containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client =>
{
client.Content = Encoding.UTF8.GetBytes("{" +
"\"PartitionId\":\"0\"," +
"\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," +
"\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," +
"\"Epoch\":386," +
$"\"Offset\":{offsetJsonValue}," +
"\"SequenceNumber\":960180" +
"}");
});

var target = new BlobCheckpointStoreInternal(containerClient, initializeWithLegacyCheckpoints: true);
var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None);

Assert.That(checkpoint, Is.Null, "A checkpoint should have not been returned.");
}

/// <summary>
/// Verifies basic functionality of GetCheckpointAsync and ensures the appropriate events are emitted on failure.
/// </summary>
Expand Down
2 changes: 0 additions & 2 deletions sdk/eventhub/Azure.Messaging.EventHubs.sln
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ Global
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{327A0C67-D9B6-4831-A53F-BFE059934787} = {4C209B69-98C3-4B12-B777-111EFF0E6F66}
{AF73A238-61D1-4D1C-807F-57C19F193DC6} = {4C209B69-98C3-4B12-B777-111EFF0E6F66}
{5CFD80DB-98F3-4E59-9792-1C421786D3D0} = {4C209B69-98C3-4B12-B777-111EFF0E6F66}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A9B7844C-B066-4B3D-A0E7-D5008C5EC232}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fixed an issue where a legacy ownership record without checkpoint data was interpreted as a valid checkpoint and used to initialize a partition when no current generation checkpoint was present.

### Other Changes

## 6.5.2 (2025-06-16)
Expand Down