Skip to content

Commit 4219335

Browse files
committed
Check for eligibility on every removed item
Before, the DRR scheduler checked for eligibility if the current head item was removed. This was not enough since a vqueue might have been not eligible because it was running out of concurrency tokens. A removed item could have released the concurrency token and thereby it could make the vqueue eligible again if the vqueue contains more items. Hence, we should check on every removed item whether this event had an impact on the vqueue's eligibility.
1 parent f625425 commit 4219335

File tree

3 files changed

+25
-3
lines changed

3 files changed

+25
-3
lines changed

crates/storage-api/src/vqueue_table/entry.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use restate_types::state_mut::ExternalStateMutation;
1616
use restate_types::vqueue::{
1717
EffectivePriority, NewEntryPriority, VQueueId, VQueueInstance, VQueueParent,
1818
};
19+
use std::fmt::{Debug, Formatter};
1920

2021
use crate::StorageError;
2122

@@ -36,7 +37,7 @@ pub enum EntryKind {
3637

3738
// Using u128 would have added an extra unnecessary 8 bytes due to alignment
3839
// requirements (u128 is 0x10 aligned and it forces the struct to be 0x10 aligned)
39-
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
40+
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
4041
pub struct EntryId([u8; 16]);
4142

4243
impl EntryId {
@@ -61,6 +62,15 @@ impl EntryId {
6162
}
6263
}
6364

65+
impl Debug for EntryId {
66+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67+
// display inner field as u128 to make it a bit easier to read
68+
f.debug_tuple("EntryId")
69+
.field(&u128::from_be_bytes(self.0))
70+
.finish()
71+
}
72+
}
73+
6474
impl From<&InvocationId> for EntryId {
6575
#[inline]
6676
fn from(id: &InvocationId) -> Self {

crates/types/src/clock/unique_timestamp.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use std::fmt::{Debug, Formatter};
1112
use std::num::NonZeroU64;
1213
use std::time::SystemTime;
1314

@@ -50,7 +51,7 @@ pub enum Error {
5051
/// The timestamp is represented as a 64-bit unsigned integer. The upper 42 bits
5152
/// represent the physical time in milliseconds since [`RESTATE_EPOCH`], and the
5253
/// lower 22 bits represent the logical clock count.
53-
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
54+
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
5455
#[repr(transparent)]
5556
pub struct UniqueTimestamp(NonZeroU64);
5657

@@ -132,6 +133,17 @@ impl UniqueTimestamp {
132133
}
133134
}
134135

136+
impl Debug for UniqueTimestamp {
137+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138+
// Output unique timestamp as a pair of physical and logical timestamps. Convert the physical
139+
// timestamp from the RESTATE_EPOCH to the UNIX_EPOCH.
140+
f.debug_struct("UniqueTimestamp")
141+
.field("physical", &self.to_unix_millis().as_u64())
142+
.field("logical", &self.logical_raw())
143+
.finish()
144+
}
145+
}
146+
135147
impl From<SystemTime> for UniqueTimestamp {
136148
fn from(value: SystemTime) -> Self {
137149
// The assumption is that SystemTime will always be > RESTATE_EPOCH

crates/vqueues/src/scheduler/vqueue_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ where
274274
}
275275

276276
pub fn check_eligibility(
277-
&mut self,
277+
&self,
278278
now: UniqueTimestamp,
279279
meta: &VQueueMeta,
280280
config: &VQueueConfig,

0 commit comments

Comments
 (0)