Skip to content

TQ: Implement prepare and commit for initial config #8682

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions trust-quorum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ bcs.workspace = true
bootstore.workspace = true
camino.workspace = true
chacha20poly1305.workspace = true
daft.workspace = true
derive_more.workspace = true
gfss.workspace = true
hex.workspace = true
Expand Down
85 changes: 53 additions & 32 deletions trust-quorum/src/coordinator_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@

use crate::NodeHandlerCtx;
use crate::crypto::{LrtqShare, Sha3_256Digest, ShareDigestLrtq};
use crate::messages::PeerMsg;
use crate::validators::{ReconfigurationError, ValidatedReconfigureMsg};
use crate::{Configuration, Epoch, PeerMsgKind, PlatformId};
use gfss::shamir::Share;
use slog::{Logger, o, warn};
use std::collections::{BTreeMap, BTreeSet};
use std::time::Instant;

/// The state of a reconfiguration coordinator.
///
Expand All @@ -29,10 +27,6 @@ use std::time::Instant;
pub struct CoordinatorState {
log: Logger,

/// When the reconfiguration started
#[expect(unused)]
start_time: Instant,

/// A copy of the message used to start this reconfiguration
reconfigure_msg: ValidatedReconfigureMsg,

Expand All @@ -42,9 +36,6 @@ pub struct CoordinatorState {

/// What is the coordinator currently doing
op: CoordinatorOperation,

/// When to resend prepare messages next
retry_deadline: Instant,
}

impl CoordinatorState {
Expand All @@ -54,7 +45,6 @@ impl CoordinatorState {
/// `PrepareMsg` so that it can be persisted.
pub fn new_uninitialized(
log: Logger,
now: Instant,
msg: ValidatedReconfigureMsg,
) -> Result<(CoordinatorState, Configuration, Share), ReconfigurationError>
{
Expand All @@ -81,7 +71,7 @@ impl CoordinatorState {
prepare_acks: BTreeSet::new(),
};

let state = CoordinatorState::new(log, now, msg, config.clone(), op);
let state = CoordinatorState::new(log, msg, config.clone(), op);

// Safety: Construction of a `ValidatedReconfigureMsg` ensures that
// `my_platform_id` is part of the new configuration and has a share.
Expand All @@ -92,7 +82,6 @@ impl CoordinatorState {
/// A reconfiguration from one group to another
pub fn new_reconfiguration(
log: Logger,
now: Instant,
msg: ValidatedReconfigureMsg,
last_committed_config: &Configuration,
) -> Result<CoordinatorState, ReconfigurationError> {
Expand All @@ -107,7 +96,7 @@ impl CoordinatorState {
new_shares,
};

Ok(CoordinatorState::new(log, now, msg, config, op))
Ok(CoordinatorState::new(log, msg, config, op))
}

// Intentionally private!
Expand All @@ -116,20 +105,15 @@ impl CoordinatorState {
// more specific, and perform validation of arguments.
fn new(
log: Logger,
now: Instant,
reconfigure_msg: ValidatedReconfigureMsg,
configuration: Configuration,
op: CoordinatorOperation,
) -> CoordinatorState {
// We want to send any pending messages immediately
let retry_deadline = now;
CoordinatorState {
log: log.new(o!("component" => "tq-coordinator-state")),
start_time: now,
reconfigure_msg,
configuration,
op,
retry_deadline,
}
}

Expand All @@ -138,6 +122,10 @@ impl CoordinatorState {
&self.reconfigure_msg
}

pub fn op(&self) -> &CoordinatorOperation {
&self.op
}

// Send any required messages as a reconfiguration coordinator
//
// This varies depending upon the current `CoordinatorState`.
Expand All @@ -147,31 +135,56 @@ impl CoordinatorState {
// will return a copy of it.
//
// This method is "in progress" - allow unused parameters for now
#[expect(unused)]
pub fn send_msgs(&mut self, ctx: &mut impl NodeHandlerCtx) {
let now = ctx.now();
if now < self.retry_deadline {
return;
}
self.retry_deadline = now + self.reconfigure_msg.retry_timeout();
match &self.op {
#[expect(unused)]
CoordinatorOperation::CollectShares {
epoch,
members,
collected_shares,
..
} => {}
#[expect(unused)]
CoordinatorOperation::CollectLrtqShares { members, shares } => {}
CoordinatorOperation::Prepare { prepares, prepare_acks } => {
let rack_id = self.reconfigure_msg.rack_id();
CoordinatorOperation::Prepare { prepares, .. } => {
for (platform_id, (config, share)) in
prepares.clone().into_iter()
{
if ctx.connected().contains(&platform_id) {
ctx.send(
platform_id,
PeerMsgKind::Prepare { config, share },
);
}
}
}
}
}

// Send any required messages to a newly connected node
// This method is "in progress" - allow unused parameters for now
#[expect(unused)]
pub fn send_msgs_to(
&mut self,
ctx: &mut impl NodeHandlerCtx,
to: PlatformId,
) {
match &self.op {
CoordinatorOperation::CollectShares {
epoch,
members,
collected_shares,
..
} => {}
CoordinatorOperation::CollectLrtqShares { members, shares } => {}
CoordinatorOperation::Prepare { prepares, prepare_acks } => {
let rack_id = self.reconfigure_msg.rack_id();
if let Some((config, share)) = prepares.get(&to) {
ctx.send(
platform_id,
PeerMsg {
rack_id,
kind: PeerMsgKind::Prepare { config, share },
to,
PeerMsgKind::Prepare {
config: config.clone(),
share: share.clone(),
},
);
}
Expand Down Expand Up @@ -217,7 +230,6 @@ impl CoordinatorState {
/// What should the coordinator be doing?
pub enum CoordinatorOperation {
// We haven't started implementing this yet
#[expect(unused)]
CollectShares {
epoch: Epoch,
members: BTreeMap<PlatformId, Sha3_256Digest>,
Expand All @@ -226,7 +238,6 @@ pub enum CoordinatorOperation {
},
// We haven't started implementing this yet
// Epoch is always 0
#[allow(unused)]
CollectLrtqShares {
members: BTreeMap<PlatformId, ShareDigestLrtq>,
shares: BTreeMap<PlatformId, LrtqShare>,
Expand All @@ -250,4 +261,14 @@ impl CoordinatorOperation {
CoordinatorOperation::Prepare { .. } => "prepare",
}
}

/// Return the members that have acked prepares, if the current operation
/// is `Prepare`. Otherwise return an empty set.
pub fn acked_prepares(&self) -> BTreeSet<PlatformId> {
if let CoordinatorOperation::Prepare { prepare_acks, .. } = self {
prepare_acks.clone()
} else {
BTreeSet::new()
}
}
}
4 changes: 3 additions & 1 deletion trust-quorum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ mod node_ctx;
mod persistent_state;
mod validators;
pub use configuration::Configuration;
pub(crate) use coordinator_state::CoordinatorState;
pub use coordinator_state::{CoordinatorOperation, CoordinatorState};

pub use crypto::RackSecret;
pub use messages::*;
pub use node::Node;
Expand All @@ -38,6 +39,7 @@ pub use persistent_state::{PersistentState, PersistentStateSummary};
Eq,
PartialOrd,
Ord,
Hash,
Serialize,
Deserialize,
Display,
Expand Down
5 changes: 1 addition & 4 deletions trust-quorum/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{Configuration, Epoch, PlatformId, Threshold};
use gfss::shamir::Share;
use omicron_uuid_kinds::RackUuid;
use serde::{Deserialize, Serialize};
use std::{collections::BTreeSet, time::Duration};
use std::collections::BTreeSet;

/// A request from nexus informing a node to start coordinating a
/// reconfiguration.
Expand All @@ -20,9 +20,6 @@ pub struct ReconfigureMsg {
pub last_committed_epoch: Option<Epoch>,
pub members: BTreeSet<PlatformId>,
pub threshold: Threshold,

// The timeout before we send a follow up request to a peer
pub retry_timeout: Duration,
}

/// Messages sent between trust quorum members over a sprockets channel
Expand Down
Loading
Loading