Skip to content

feat: graceful shutdown #150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ reth-payload-primitives = { git = "https://github.com/scroll-tech/reth.git", def
reth-primitives = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-primitives-traits = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-provider = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-rpc-builder = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-rpc-server-types = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-tasks = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-tokio-util = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion crates/database/db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ mod test {

// Should return the highest safe block (block 201)
let latest_safe = db.get_latest_safe_l2_block().await.unwrap();
assert_eq!(latest_safe, Some(safe_block_2));
assert_eq!(latest_safe, Some((safe_block_2, batch_info)));
}

#[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions crates/database/db/src/models/batch_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl From<Model> for BatchCommitData {
blob_versioned_hash: value
.blob_hash
.map(|b| b.as_slice().try_into().expect("data persisted in database is valid")),
finalized_block_number: value.finalized_block_number.map(|b| b as u64),
}
}
}
8 changes: 8 additions & 0 deletions crates/database/db/src/models/l2_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ impl Model {
pub(crate) fn block_info(&self) -> BlockInfo {
BlockInfo { number: self.block_number as u64, hash: B256::from_slice(&self.block_hash) }
}

pub(crate) fn batch_info(&self) -> Option<BatchInfo> {
self.batch_hash.as_ref().map(|hash| BatchInfo {
index: self.batch_index.expect("batch index must be present if batch hash is present")
as u64,
hash: B256::from_slice(hash),
})
}
}

/// The relation for the batch input model.
Expand Down
60 changes: 55 additions & 5 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
async fn insert_batch(&self, batch_commit: BatchCommitData) -> Result<(), DatabaseError> {
tracing::trace!(target: "scroll::db", batch_hash = ?batch_commit.hash, batch_index = batch_commit.index, "Inserting batch input into database.");
let batch_commit: models::batch_commit::ActiveModel = batch_commit.into();
batch_commit.insert(self.get_connection()).await?;
Ok(())
match models::batch_commit::Entity::insert(batch_commit)
.on_conflict(
OnConflict::column(models::batch_commit::Column::Index).do_nothing().to_owned(),
)
.exec(self.get_connection())
.await
{
Err(sea_orm::DbErr::RecordNotInserted) => Ok(()),
x => Ok(x.map(|_| ())?),
}
}

/// Finalize a [`BatchCommitData`] with the provided `batch_hash` in the database and set the
Expand Down Expand Up @@ -113,7 +121,12 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
async fn insert_l1_message(&self, l1_message: L1MessageEnvelope) -> Result<(), DatabaseError> {
tracing::trace!(target: "scroll::db", queue_index = l1_message.transaction.queue_index, "Inserting L1 message into database.");
let l1_message: models::l1_message::ActiveModel = l1_message.into();
l1_message.insert(self.get_connection()).await?;
models::l1_message::Entity::insert(l1_message)
.on_conflict(
OnConflict::column(models::l1_message::Column::QueueIndex).do_nothing().to_owned(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we have an edge case where we index an L1 message, shutdown, the L1 reorgs, and we don't update it because of the do_nothing directive?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! This is a possibility. Let me review and see what makes the most sense and I will add a proposal here which we can discuss before I implement.

)
.exec(self.get_connection())
.await?;
Ok(())
}

Expand Down Expand Up @@ -209,14 +222,24 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
}

/// Get the latest safe L2 [`BlockInfo`] from the database.
async fn get_latest_safe_l2_block(&self) -> Result<Option<BlockInfo>, DatabaseError> {
async fn get_latest_safe_l2_block(
&self,
) -> Result<Option<(BlockInfo, BatchInfo)>, DatabaseError> {
tracing::trace!(target: "scroll::db", "Fetching latest safe L2 block from database.");
Ok(models::l2_block::Entity::find()
.filter(models::l2_block::Column::BatchIndex.is_not_null())
.order_by_desc(models::l2_block::Column::BlockNumber)
.one(self.get_connection())
.await
.map(|x| x.map(|x| x.block_info()))?)
.map(|x| {
x.map(|x| {
(
x.block_info(),
x.batch_info()
.expect("Batch info must be present due to database query arguments"),
)
})
})?)
}

/// Get the latest L2 [`BlockInfo`] from the database.
Expand All @@ -229,6 +252,33 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.map(|x| x.block_info()))?)
}

/// Get the safe block for startup from the database.
///
/// This method fetches the batch info for the latest safe L2 block, it then retrieves the
/// latest block for the previous batch (i.e., the batch before the latest safe block) and
/// returns the block info.
async fn get_startup_safe_block(&self) -> Result<Option<BlockInfo>, DatabaseError> {
tracing::trace!(target: "scroll::db", "Fetching startup safe block from database.");
let safe = if let Some(batch_info) = self
.get_latest_safe_l2_block()
.await?
.map(|(_, batch_info)| batch_info)
.filter(|b| b.index > 1)
{
let batch = self
.get_batch_by_index(batch_info.index - 1)
.await?
.expect("Batch info must be present due to database query arguments");
let block = self.get_highest_block_for_batch(batch.hash).await?;
tracing::info!(target:"test", "{:?}", block);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover from debug

block
} else {
None
};

Ok(safe)
}

/// Delete all L2 blocks with a block number greater than the provided block number.
async fn delete_l2_blocks_gt(&self, block_number: u64) -> Result<u64, DatabaseError> {
tracing::trace!(target: "scroll::db", block_number, "Deleting L2 blocks greater than provided block number.");
Expand Down
20 changes: 13 additions & 7 deletions crates/database/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ impl<MI: MigrationInfo + Send + Sync + 'static> MigratorTrait for Migrator<MI> {
}

pub mod traits {
use crate::{ScrollMainnetMigrationInfo, ScrollSepoliaMigrationInfo};
use crate::{
migration_info::ScrollMainnetTestMigrationInfo, ScrollMainnetMigrationInfo,
ScrollSepoliaMigrationInfo,
};
use reth_chainspec::NamedChain;
use sea_orm::{prelude::async_trait::async_trait, DatabaseConnection, DbErr};
use sea_orm_migration::MigratorTrait;
Expand All @@ -33,20 +36,23 @@ pub mod traits {
#[async_trait]
pub trait ScrollMigrator {
/// Migrates the tables.
async fn migrate(&self, conn: &DatabaseConnection) -> Result<(), DbErr>;
async fn migrate(&self, conn: &DatabaseConnection, test: bool) -> Result<(), DbErr>;
}

#[async_trait]
impl ScrollMigrator for NamedChain {
async fn migrate(&self, conn: &DatabaseConnection) -> Result<(), DbErr> {
match self {
NamedChain::Scroll => {
async fn migrate(&self, conn: &DatabaseConnection, test: bool) -> Result<(), DbErr> {
match (self, test) {
(NamedChain::Scroll, false) => {
Ok(super::Migrator::<ScrollMainnetMigrationInfo>::up(conn, None))
}
NamedChain::ScrollSepolia => {
(NamedChain::Scroll, true) => {
Ok(super::Migrator::<ScrollMainnetTestMigrationInfo>::up(conn, None))
}
(NamedChain::ScrollSepolia, _) => {
Ok(super::Migrator::<ScrollSepoliaMigrationInfo>::up(conn, None))
}
NamedChain::Dev => Ok(super::Migrator::<()>::up(conn, None)),
(NamedChain::Dev, _) => Ok(super::Migrator::<()>::up(conn, None)),
_ => Err(DbErr::Custom("expected Scroll Mainnet, Sepolia or Dev".into())),
}?
.await
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::MigrationInfo;
use crate::{migration_info::DataSource, MigrationInfo};
use std::{collections::HashMap, time::Duration};

use alloy_primitives::{bytes::Buf, B256};
Expand All @@ -23,31 +23,37 @@ impl<MI> MigrationName for Migration<MI> {
#[async_trait::async_trait]
impl<MI: MigrationInfo + Send + Sync> MigrationTrait for Migration<MI> {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
if let (Some(url), Some(hash)) = (MI::data_url(), MI::data_hash()) {
// download data.
let file = download(&url).await.map_err(|err| DbErr::Custom(err.to_string()))?;
// verify hash of data.
verify_data_hash(hash, &file).map_err(|err| DbErr::Custom(err.to_string()))?;

// decode data and convert to database model.
let records: Vec<ActiveModel> = decode_to_headers(file)
.map_err(|err| DbErr::Custom(err.to_string()))?
.into_iter()
.enumerate()
.map(|(i, h)| (i as i64, h).into())
.collect();

let db = manager.get_connection();

// batch the insertion to avoid `too many SQL variables` error.
const MAX_BATCH_SIZE: usize = 3000;
let mut cursor = 0;
while cursor < records.len() {
let start = cursor;
let end = (start + MAX_BATCH_SIZE).min(records.len());
Entity::insert_many(records[start..end].to_vec()).exec(db).await?;
cursor = end;
match (MI::data_source(), MI::data_hash()) {
(Some(DataSource::Url(url)), Some(hash)) => {
// download data.
let file = download(&url).await.map_err(|err| DbErr::Custom(err.to_string()))?;
// verify hash of data.
verify_data_hash(hash, &file).map_err(|err| DbErr::Custom(err.to_string()))?;

// decode data and convert to database model.
let records: Vec<ActiveModel> = decode_to_headers(file)
.map_err(|err| DbErr::Custom(err.to_string()))?
.into_iter()
.enumerate()
.map(|(i, h)| (i as i64, h).into())
.collect();

let db = manager.get_connection();

// batch the insertion to avoid `too many SQL variables` error.
const MAX_BATCH_SIZE: usize = 3000;
let mut cursor = 0;
while cursor < records.len() {
let start = cursor;
let end = (start + MAX_BATCH_SIZE).min(records.len());
Entity::insert_many(records[start..end].to_vec()).exec(db).await?;
cursor = end;
}
}
(Some(DataSource::Sql(sql)), _) => {
manager.get_connection().execute_unprepared(&sql).await?;
}
_ => (),
}

Ok(())
Expand Down Expand Up @@ -94,7 +100,7 @@ async fn download(url: &str) -> eyre::Result<Vec<u8>> {
.build();

const CHUNK_SIZE: u64 = 16_000_000;
const MAX_TASKS: usize = 32;
const MAX_TASKS: usize = 4;

// get file size and verify range support.
let total_size = get_file_size(&client, url).await?;
Expand Down
33 changes: 27 additions & 6 deletions crates/database/migration/src/migration_info.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use alloy_primitives::{b256, B256};

pub enum DataSource {
Url(String),
Sql(String),
}

pub trait MigrationInfo {
fn data_url() -> Option<String>;
fn data_source() -> Option<DataSource>;
fn data_hash() -> Option<B256>;
}

impl MigrationInfo for () {
fn data_url() -> Option<String> {
fn data_source() -> Option<DataSource> {
None
}

Expand All @@ -19,21 +24,37 @@ impl MigrationInfo for () {
pub struct ScrollMainnetMigrationInfo;

impl MigrationInfo for ScrollMainnetMigrationInfo {
fn data_url() -> Option<String> {
Some("https://scroll-block-missing-metadata.s3.us-west-2.amazonaws.com/534352.bin".into())
fn data_source() -> Option<DataSource> {
Some(DataSource::Url(
"https://scroll-block-missing-metadata.s3.us-west-2.amazonaws.com/534352.bin".into(),
))
}

fn data_hash() -> Option<B256> {
Some(b256!("9062e2fa1200dca63bee1d18d429572f134f5f0c98cb4852f62fc394e33cf6e6"))
}
}

pub struct ScrollMainnetTestMigrationInfo;

impl MigrationInfo for ScrollMainnetTestMigrationInfo {
fn data_source() -> Option<DataSource> {
Some(DataSource::Sql(include_str!(".././testdata/mainnet-sample.sql").into()))
}

fn data_hash() -> Option<B256> {
None
}
}

/// The type implementing migration info for Sepolia.
pub struct ScrollSepoliaMigrationInfo;

impl MigrationInfo for ScrollSepoliaMigrationInfo {
fn data_url() -> Option<String> {
Some("https://scroll-block-missing-metadata.s3.us-west-2.amazonaws.com/534351.bin".into())
fn data_source() -> Option<DataSource> {
Some(DataSource::Url(
"https://scroll-block-missing-metadata.s3.us-west-2.amazonaws.com/534351.bin".into(),
))
}

fn data_hash() -> Option<B256> {
Expand Down
Loading
Loading