From 80b0f88605f17bb1edeb0f77b6bdf7f9e8bc05cf Mon Sep 17 00:00:00 2001 From: Pedro M Duarte Date: Fri, 11 Apr 2025 16:40:26 -0400 Subject: [PATCH 1/2] Migrate interop ETL tasks from transforms to blockbatch daily --- src/op_analytics/dagster/assets/transforms.py | 21 +++-- .../etl/blockbatchloaddaily/datasets.py | 42 +++++++++ .../dim_erc20_first_seen_v1__CREATE.sql} | 0 .../dim_erc20_first_seen_v1__INSERT.sql} | 6 +- .../dim_erc20_ntt_first_seen_v1__CREATE.sql} | 0 .../dim_erc20_ntt_first_seen_v1__INSERT.sql} | 5 +- .../dim_erc20_oft_first_seen_v1__CREATE.sql} | 0 .../dim_erc20_oft_first_seen_v1__INSERT.sql} | 5 +- .../fact_erc20_ntt_transfers_v1__CREATE.sql} | 0 .../fact_erc20_ntt_transfers_v1__INSERT.sql} | 16 +--- .../fact_erc20_oft_transfers_v1__CREATE.sql} | 0 .../fact_erc20_oft_transfers_v1__INSERT.sql} | 15 +--- .../blockbatchloaddaily/loadspec_datechain.py | 2 +- .../create/fact_ntt_delivery_events_v1.sql | 19 ---- .../create/fact_oft_sent_events_v1.sql | 20 ----- .../dim_erc20_first_seen_v1.sql | 25 ++++++ .../dim_erc20_ntt_first_seen_v1.sql | 25 ++++++ .../dim_erc20_oft_first_seen_v1.sql | 25 ++++++ .../fact_erc20_ntt_transfers_v1.sql | 87 +++++++++++++++++++ .../fact_erc20_oft_transfers_v1.sql | 70 +++++++++++++++ .../test_interop_queries.py | 43 +++++++++ 21 files changed, 340 insertions(+), 86 deletions(-) rename src/op_analytics/{transforms/interop/create/dim_erc20_first_seen_v1.sql => datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_first_seen_v1__CREATE.sql} (100%) rename src/op_analytics/{transforms/interop/update/01_dim_erc20_first_seen_v1.sql => datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_first_seen_v1__INSERT.sql} (70%) rename src/op_analytics/{transforms/interop/create/dim_erc20_ntt_first_seen_v1.sql => datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_ntt_first_seen_v1__CREATE.sql} (100%) rename src/op_analytics/{transforms/interop/update/04_dim_erc20_ntt_first_seen_v1.sql => datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_ntt_first_seen_v1__INSERT.sql} (81%) rename src/op_analytics/{transforms/interop/create/dim_erc20_oft_first_seen_v1.sql => datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_oft_first_seen_v1__CREATE.sql} (100%) rename src/op_analytics/{transforms/interop/update/05_dim_erc20_oft_first_seen_v1.sql => datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_oft_first_seen_v1__INSERT.sql} (81%) rename src/op_analytics/{transforms/interop/create/fact_erc20_ntt_transfers_v1.sql => datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_ntt_transfers_v1__CREATE.sql} (100%) rename src/op_analytics/{transforms/interop/update/03_fact_erc20_ntt_transfers_v1.sql => datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_ntt_transfers_v1__INSERT.sql} (87%) rename src/op_analytics/{transforms/interop/create/fact_erc20_oft_transfers_v1.sql => datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_oft_transfers_v1__CREATE.sql} (100%) rename src/op_analytics/{transforms/interop/update/02_fact_erc20_oft_transfers_v1.sql => datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_oft_transfers_v1__INSERT.sql} (78%) delete mode 100644 src/op_analytics/transforms/interop/create/fact_ntt_delivery_events_v1.sql delete mode 100644 src/op_analytics/transforms/interop/create/fact_oft_sent_events_v1.sql create mode 100644 tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/dim_erc20_first_seen_v1.sql create mode 100644 tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/dim_erc20_ntt_first_seen_v1.sql create mode 100644 tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/dim_erc20_oft_first_seen_v1.sql create mode 100644 tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/fact_erc20_ntt_transfers_v1.sql create mode 100644 tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/fact_erc20_oft_transfers_v1.sql create mode 100644 tests/op_analytics/datapipeline/etl/blockbatchloaddaily/test_interop_queries.py diff --git a/src/op_analytics/dagster/assets/transforms.py b/src/op_analytics/dagster/assets/transforms.py index fc6978c8e3d..8a7ebffe6fe 100644 --- a/src/op_analytics/dagster/assets/transforms.py +++ b/src/op_analytics/dagster/assets/transforms.py @@ -4,6 +4,7 @@ ) from op_analytics.transforms.main import execute_dt_transforms +from op_analytics.datapipeline.etl.blockbatchloaddaily.main import daily_to_clickhouse @asset @@ -17,15 +18,19 @@ def erc20transfers(context: AssetExecutionContext): def interop(context: AssetExecutionContext): """Run interop dataset transformations.""" - # Run all steps except for 6 and 7. - result = execute_dt_transforms( - group_name="interop", - force_complete=True, - range_spec="m2days", - steps_to_run=None, - steps_to_skip=[6, 7], + from op_analytics.datapipeline.etl.blockbatchloaddaily.datasets import ( + INTEROP_ERC20_FIRST_SEEN, + INTEROP_NTT_FIRST_SEEN, + INTEROP_OFT_FIRST_SEEN, + INTEROP_NTT_TRANSFERS, + INTEROP_OFT_TRANSFERS, ) - context.log.info(result) + + daily_to_clickhouse(dataset=INTEROP_ERC20_FIRST_SEEN) + daily_to_clickhouse(dataset=INTEROP_NTT_TRANSFERS) + daily_to_clickhouse(dataset=INTEROP_OFT_TRANSFERS) + daily_to_clickhouse(dataset=INTEROP_NTT_FIRST_SEEN) + daily_to_clickhouse(dataset=INTEROP_OFT_FIRST_SEEN) # For step 6 we need a back-dated run. What we do is detect ERC-20 create traces # for conracts that have had at least one ERC-20 transfer. If we run at the present diff --git a/src/op_analytics/datapipeline/etl/blockbatchloaddaily/datasets.py b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/datasets.py index a9488922abc..f16cb2ce834 100644 --- a/src/op_analytics/datapipeline/etl/blockbatchloaddaily/datasets.py +++ b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/datasets.py @@ -80,3 +80,45 @@ output_root_path="transforms_dummy/daily_counts_v0", inputs_clickhouse=["blockbatch_daily/aggtxs/daily_address_summary_v1"], ) + + +################################################################################ +# Interop datasets +################################################################################ + +RAW_LOGS = "ingestion/logs_v1" +ERC20_TRANSFERS = "blockbatch/token_transfers/erc20_transfers_v1" + + +INTEROP_ERC20_FIRST_SEEN = ClickHouseDateChainETL( + output_root_path="transforms_interop/dim_erc20_first_seen_v1", + inputs_clickhouse=[ERC20_TRANSFERS], +) + +INTEROP_NTT_TRANSFERS = ClickHouseDateChainETL( + output_root_path="transforms_interop/fact_erc20_ntt_transfers_v1", + inputs_blockbatch=[RAW_LOGS], + inputs_clickhouse=[ERC20_TRANSFERS], +) + +INTEROP_OFT_TRANSFERS = ClickHouseDateChainETL( + output_root_path="transforms_interop/fact_erc20_oft_transfers_v1", + inputs_blockbatch=[ + "ingestion/logs_v1", + ], + inputs_clickhouse=[ERC20_TRANSFERS], +) + +INTEROP_NTT_FIRST_SEEN = ClickHouseDateChainETL( + output_root_path="transforms_interop/dim_erc20_ntt_first_seen_v1", + inputs_clickhouse=[ + INTEROP_NTT_TRANSFERS.output_root_path, + ], +) + +INTEROP_OFT_FIRST_SEEN = ClickHouseDateChainETL( + output_root_path="transforms_interop/dim_erc20_oft_first_seen_v1", + inputs_clickhouse=[ + INTEROP_OFT_TRANSFERS.output_root_path, + ], +) diff --git a/src/op_analytics/transforms/interop/create/dim_erc20_first_seen_v1.sql b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_first_seen_v1__CREATE.sql similarity index 100% rename from src/op_analytics/transforms/interop/create/dim_erc20_first_seen_v1.sql rename to src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_first_seen_v1__CREATE.sql diff --git a/src/op_analytics/transforms/interop/update/01_dim_erc20_first_seen_v1.sql b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_first_seen_v1__INSERT.sql similarity index 70% rename from src/op_analytics/transforms/interop/update/01_dim_erc20_first_seen_v1.sql rename to src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_first_seen_v1__INSERT.sql index 3a0617cf1c7..29d94192b5d 100644 --- a/src/op_analytics/transforms/interop/update/01_dim_erc20_first_seen_v1.sql +++ b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_first_seen_v1__INSERT.sql @@ -4,8 +4,6 @@ Update first seen erc20 transfers. */ -INSERT INTO _placeholder_ - SELECT chain , chain_id @@ -15,7 +13,5 @@ SELECT , min(block_timestamp) AS first_seen , dateDiff('second', first_seen, '2106-01-01 00:00:00'::DATETIME) AS row_version -FROM blockbatch.token_transfers__erc20_transfers_v1 -WHERE - dt = {dtparam:Date} -- noqa: LT01,CP02 +FROM INPUT_CLICKHOUSE('blockbatch/token_transfers/erc20_transfers_v1') GROUP BY 1, 2, 3 diff --git a/src/op_analytics/transforms/interop/create/dim_erc20_ntt_first_seen_v1.sql b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_ntt_first_seen_v1__CREATE.sql similarity index 100% rename from src/op_analytics/transforms/interop/create/dim_erc20_ntt_first_seen_v1.sql rename to src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_ntt_first_seen_v1__CREATE.sql diff --git a/src/op_analytics/transforms/interop/update/04_dim_erc20_ntt_first_seen_v1.sql b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_ntt_first_seen_v1__INSERT.sql similarity index 81% rename from src/op_analytics/transforms/interop/update/04_dim_erc20_ntt_first_seen_v1.sql rename to src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_ntt_first_seen_v1__INSERT.sql index 2e0b68c409a..4dce469afec 100644 --- a/src/op_analytics/transforms/interop/update/04_dim_erc20_ntt_first_seen_v1.sql +++ b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_ntt_first_seen_v1__INSERT.sql @@ -4,9 +4,6 @@ Aggregated to keep only the first seen value for each token contract address. */ -INSERT INTO _placeholder_ - - SELECT chain , chain_id @@ -16,5 +13,5 @@ SELECT , min(block_timestamp) AS first_seen , dateDiff('second', first_seen, '2106-01-01 00:00:00'::DATETIME) AS row_version -FROM transforms_interop.fact_erc20_ntt_transfers_v1 +FROM INPUT_CLICKHOUSE('transforms_interop/fact_erc20_ntt_transfers_v1') GROUP BY 1, 2, 3 diff --git a/src/op_analytics/transforms/interop/create/dim_erc20_oft_first_seen_v1.sql b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_oft_first_seen_v1__CREATE.sql similarity index 100% rename from src/op_analytics/transforms/interop/create/dim_erc20_oft_first_seen_v1.sql rename to src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_oft_first_seen_v1__CREATE.sql diff --git a/src/op_analytics/transforms/interop/update/05_dim_erc20_oft_first_seen_v1.sql b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_oft_first_seen_v1__INSERT.sql similarity index 81% rename from src/op_analytics/transforms/interop/update/05_dim_erc20_oft_first_seen_v1.sql rename to src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_oft_first_seen_v1__INSERT.sql index b147f8fa702..d66310d2a71 100644 --- a/src/op_analytics/transforms/interop/update/05_dim_erc20_oft_first_seen_v1.sql +++ b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/dim_erc20_oft_first_seen_v1__INSERT.sql @@ -4,9 +4,6 @@ Aggregated to keep only the first seen value for each token contract address. */ -INSERT INTO _placeholder_ - - SELECT chain , chain_id @@ -16,5 +13,5 @@ SELECT , min(block_timestamp) AS first_seen , dateDiff('second', first_seen, '2106-01-01 00:00:00'::DATETIME) AS row_version -FROM transforms_interop.fact_erc20_oft_transfers_v1 +FROM INPUT_CLICKHOUSE('transforms_interop/fact_erc20_oft_transfers_v1') GROUP BY 1, 2, 3 diff --git a/src/op_analytics/transforms/interop/create/fact_erc20_ntt_transfers_v1.sql b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_ntt_transfers_v1__CREATE.sql similarity index 100% rename from src/op_analytics/transforms/interop/create/fact_erc20_ntt_transfers_v1.sql rename to src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_ntt_transfers_v1__CREATE.sql diff --git a/src/op_analytics/transforms/interop/update/03_fact_erc20_ntt_transfers_v1.sql b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_ntt_transfers_v1__INSERT.sql similarity index 87% rename from src/op_analytics/transforms/interop/update/03_fact_erc20_ntt_transfers_v1.sql rename to src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_ntt_transfers_v1__INSERT.sql index feb9f77c56f..5e0742fddc6 100644 --- a/src/op_analytics/transforms/interop/update/03_fact_erc20_ntt_transfers_v1.sql +++ b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_ntt_transfers_v1__INSERT.sql @@ -20,8 +20,6 @@ https://optimistic.etherscan.io/tx/0x9ae78927d9771a2bcd89fc9eb467c063753dc30214d */ -INSERT INTO _placeholder_ - WITH @@ -33,13 +31,7 @@ ntt_delivery_events AS ( -- noqa: ST03 , transaction_hash , log_index - FROM - blockbatch_gcs.read_date( - rootpath = 'ingestion/logs_v1' - , chain = '*' - , dt = { dtparam: Date } - ) - + FROM INPUT_BLOCKBATCH('ingestion/logs_v1') WHERE -- -- Delivery( @@ -71,13 +63,11 @@ SELECT , t.from_address , t.to_address -FROM blockbatch.token_transfers__erc20_transfers_v1 AS t - +FROM INPUT_CLICKHOUSE('blockbatch/token_transfers/erc20_transfers_v1') AS t INNER JOIN ntt_delivery_events AS n ON t.chain_id = n.chain_id AND t.transaction_hash = n.transaction_hash WHERE - t.dt = { dtparam: Date } -- Transfer is before Delivery - AND t.log_index < n.log_index + t.log_index < n.log_index diff --git a/src/op_analytics/transforms/interop/create/fact_erc20_oft_transfers_v1.sql b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_oft_transfers_v1__CREATE.sql similarity index 100% rename from src/op_analytics/transforms/interop/create/fact_erc20_oft_transfers_v1.sql rename to src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_oft_transfers_v1__CREATE.sql diff --git a/src/op_analytics/transforms/interop/update/02_fact_erc20_oft_transfers_v1.sql b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_oft_transfers_v1__INSERT.sql similarity index 78% rename from src/op_analytics/transforms/interop/update/02_fact_erc20_oft_transfers_v1.sql rename to src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_oft_transfers_v1__INSERT.sql index 197f0664394..b2b866413be 100644 --- a/src/op_analytics/transforms/interop/update/02_fact_erc20_oft_transfers_v1.sql +++ b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/ddl/transforms_interop/fact_erc20_oft_transfers_v1__INSERT.sql @@ -9,8 +9,6 @@ conver OFTAdapter tokens. */ -INSERT INTO _placeholder_ - WITH oft_sent_events AS ( -- noqa: ST03 @@ -19,13 +17,7 @@ oft_sent_events AS ( -- noqa: ST03 , transaction_hash , address AS contract_address - FROM - blockbatch_gcs.read_date( - rootpath = 'ingestion/logs_v1' - , chain = '*' - , dt = { dtparam: Date } - ) - + FROM INPUT_BLOCKBATCH('ingestion/logs_v1') WHERE -- OFT Docs: -- https://docs.layerzero.network/v2/home/token-standards/oft-standard @@ -59,7 +51,6 @@ SELECT , t.from_address , t.to_address -FROM blockbatch.token_transfers__erc20_transfers_v1 AS t +FROM INPUT_CLICKHOUSE('blockbatch/token_transfers/erc20_transfers_v1') AS t WHERE - t.dt = { dtparam: Date } - AND (t.chain_id, t.transaction_hash, t.contract_address) IN (oft_sent_events) + (t.chain_id, t.transaction_hash, t.contract_address) IN (oft_sent_events) diff --git a/src/op_analytics/datapipeline/etl/blockbatchloaddaily/loadspec_datechain.py b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/loadspec_datechain.py index d4ac3acc399..aca49adc789 100644 --- a/src/op_analytics/datapipeline/etl/blockbatchloaddaily/loadspec_datechain.py +++ b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/loadspec_datechain.py @@ -27,7 +27,7 @@ class ETLMixin: def __post_init__(self): for root_path in self.inputs_blockbatch: - if not root_path.startswith("blockbatch/"): + if not (root_path.startswith("blockbatch/") or root_path.startswith("ingestion/")): raise ValueError(f"Invalid blockbatch input: {root_path}") for root_path in self.inputs_clickhouse: diff --git a/src/op_analytics/transforms/interop/create/fact_ntt_delivery_events_v1.sql b/src/op_analytics/transforms/interop/create/fact_ntt_delivery_events_v1.sql deleted file mode 100644 index 45fdeb41809..00000000000 --- a/src/op_analytics/transforms/interop/create/fact_ntt_delivery_events_v1.sql +++ /dev/null @@ -1,19 +0,0 @@ -CREATE TABLE IF NOT EXISTS _placeholder_ -( - `dt` Date, - `chain` String, - `chain_id` Int32, - `network` String, - `block_timestamp` DateTime CODEC(Delta(4), ZSTD(1)), - `block_number` UInt64 CODEC(Delta(4), ZSTD(1)), - `transaction_hash` FixedString(66), - `transaction_index` Int64, - `log_index` Int64, - `contract_address` FixedString(42), - `topic0` FixedString(66), - `event_name` String, - INDEX dt_idx dt TYPE minmax GRANULARITY 1, - INDEX chain_idx chain TYPE minmax GRANULARITY 1, -) -ENGINE = ReplacingMergeTree -ORDER BY (dt, chain, chain_id, network, block_number, transaction_hash, transaction_index, log_index) diff --git a/src/op_analytics/transforms/interop/create/fact_oft_sent_events_v1.sql b/src/op_analytics/transforms/interop/create/fact_oft_sent_events_v1.sql deleted file mode 100644 index d728d0a1134..00000000000 --- a/src/op_analytics/transforms/interop/create/fact_oft_sent_events_v1.sql +++ /dev/null @@ -1,20 +0,0 @@ -CREATE TABLE IF NOT EXISTS _placeholder_ -( - `dt` Date, - `chain` String, - `chain_id` Int32, - `network` String, - `block_timestamp` DateTime CODEC(Delta(4), ZSTD(1)), - `block_number` UInt64 CODEC(Delta(4), ZSTD(1)), - `transaction_hash` FixedString(66), - `transaction_index` Int64, - `log_index` Int64, - `contract_address` FixedString(42), - `topic0` FixedString(66), - `event_name` String, - INDEX dt_idx dt TYPE minmax GRANULARITY 1, - INDEX chain_idx chain TYPE minmax GRANULARITY 1, -) - -ENGINE = ReplacingMergeTree -ORDER BY (dt, chain, chain_id, network, block_number, transaction_hash, transaction_index, log_index) diff --git a/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/dim_erc20_first_seen_v1.sql b/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/dim_erc20_first_seen_v1.sql new file mode 100644 index 00000000000..744591996b5 --- /dev/null +++ b/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/dim_erc20_first_seen_v1.sql @@ -0,0 +1,25 @@ +INSERT INTO transforms_interop.dim_erc20_first_seen_v1 +/** + +Update first seen erc20 transfers. + +*/ + +SELECT + chain + , chain_id + , contract_address + + -- transfers seen earlier have greater ReplacingMergeTree row_version + , min(block_timestamp) AS first_seen + , dateDiff('second', first_seen, '2106-01-01 00:00:00'::DATETIME) AS row_version + +FROM + ( + SELECT + * + FROM blockbatch.token_transfers__erc20_transfers_v1 + WHERE dt = '2025-01-01' AND chain = 'base' + ) + +GROUP BY 1, 2, 3 diff --git a/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/dim_erc20_ntt_first_seen_v1.sql b/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/dim_erc20_ntt_first_seen_v1.sql new file mode 100644 index 00000000000..78ac13253cf --- /dev/null +++ b/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/dim_erc20_ntt_first_seen_v1.sql @@ -0,0 +1,25 @@ +INSERT INTO transforms_interop.dim_erc20_ntt_first_seen_v1 +/** + +Aggregated to keep only the first seen value for each token contract address. + +*/ + +SELECT + chain + , chain_id + , contract_address + + -- events seen earlier have greater ReplacingMergeTree row_version + , min(block_timestamp) AS first_seen + , dateDiff('second', first_seen, '2106-01-01 00:00:00'::DATETIME) AS row_version + +FROM + ( + SELECT + * + FROM transforms_interop.fact_erc20_ntt_transfers_v1 + WHERE dt = '2025-01-01' AND chain = 'base' + ) + +GROUP BY 1, 2, 3 diff --git a/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/dim_erc20_oft_first_seen_v1.sql b/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/dim_erc20_oft_first_seen_v1.sql new file mode 100644 index 00000000000..678ecf594e8 --- /dev/null +++ b/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/dim_erc20_oft_first_seen_v1.sql @@ -0,0 +1,25 @@ +INSERT INTO transforms_interop.dim_erc20_oft_first_seen_v1 +/** + +Aggregated to keep only the first seen value for each token contract address. + +*/ + +SELECT + chain + , chain_id + , contract_address + + -- events seen earlier have greater ReplacingMergeTree row_version + , min(block_timestamp) AS first_seen + , dateDiff('second', first_seen, '2106-01-01 00:00:00'::DATETIME) AS row_version + +FROM + ( + SELECT + * + FROM transforms_interop.fact_erc20_oft_transfers_v1 + WHERE dt = '2025-01-01' AND chain = 'base' + ) + +GROUP BY 1, 2, 3 diff --git a/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/fact_erc20_ntt_transfers_v1.sql b/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/fact_erc20_ntt_transfers_v1.sql new file mode 100644 index 00000000000..5e060652c82 --- /dev/null +++ b/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/fact_erc20_ntt_transfers_v1.sql @@ -0,0 +1,87 @@ +INSERT INTO transforms_interop.fact_erc20_ntt_transfers_v1 +/** + +ERC-20 Transfer transactions that also emit an NTT Delivery event. + + +IMPORTANT: this approach looks for transactions where the same transaction has +an ERC-20 Transfer followed by an NTT Delivery event. It assumes that contracts that +emit Transfer events prior to a Delivery event in a transaciton are NTT-enabled +token contracts. + +Transactions may be simple, with just one Transfer + Delivery, for example: + +https://basescan.org/tx/0x3717e2df7d7f070254d5f477a94012f8d17417a2ff6e0f0df7daa767d851808c#eventlog +https://basescan.org/tx/0x39f958600df4faff77320801daea1c9757209f93e653b545d3598596077ad1b8#eventlog + +But we can also have mor complex transactions that have several Transfer events +before the Delivery event, for example: + +https://optimistic.etherscan.io/tx/0x9ae78927d9771a2bcd89fc9eb467c063753dc30214d4b858e0cb6e02151dc592#eventlog + +*/ + +WITH + + +-- NOTE: The Delivery event is not sent by the token contract, it is sent +-- by the Wormhole Relayer or a proxy. +ntt_delivery_events AS ( -- noqa: ST03 + SELECT + chain_id + , transaction_hash + , log_index + + FROM s3( + 'https://storage.googleapis.com/oplabs-tools-data-sink/ingestion/logs_v1/chain=base/dt=2025-01-01/*.parquet', + '', + '', + 'parquet' + ) + + WHERE + -- + -- Delivery( + -- index_topic_1 address recipientContract, + -- index_topic_2 uint16 sourceChain, + -- index_topic_3 uint64 sequence, + -- bytes32 deliveryVaaHash, + -- uint8 status, + -- uint256 gasUsed, + -- uint8 refundStatus, + -- bytes additionalStatusInfo, + -- bytes overridesInfo + -- ) + topic0 = '0xbccc00b713f54173962e7de6098f643d8ebf53d488d71f4b2a5171496d038f9e' +) + +SELECT + t.dt + , t.chain + , t.chain_id + , t.network + , t.block_timestamp + , t.block_number + , t.transaction_hash + , t.transaction_index + , t.log_index + , t.contract_address + , t.amount + , t.from_address + , t.to_address + +FROM + ( + SELECT + * + FROM blockbatch.token_transfers__erc20_transfers_v1 + WHERE dt = '2025-01-01' AND chain = 'base' + ) + AS t +INNER JOIN ntt_delivery_events AS n + ON + t.chain_id = n.chain_id + AND t.transaction_hash = n.transaction_hash +WHERE + -- Transfer is before Delivery + t.log_index < n.log_index diff --git a/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/fact_erc20_oft_transfers_v1.sql b/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/fact_erc20_oft_transfers_v1.sql new file mode 100644 index 00000000000..d8a47ede72a --- /dev/null +++ b/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/rendered/transforms_interop/fact_erc20_oft_transfers_v1.sql @@ -0,0 +1,70 @@ +INSERT INTO transforms_interop.fact_erc20_oft_transfers_v1 +/** + +ERC-20 Transfer transactions that also emit an OFTSent event. + + +IMPORTANT: this approach filters for cases when the same token contract emits +the OFTSent and the Transfer event. So itonly covers OFT Tokens and it does not +conver OFTAdapter tokens. + +*/ + +WITH + +oft_sent_events AS ( -- noqa: ST03 + SELECT + chain_id + , transaction_hash + , address AS contract_address + + FROM s3( + 'https://storage.googleapis.com/oplabs-tools-data-sink/ingestion/logs_v1/chain=base/dt=2025-01-01/*.parquet', + '', + '', + 'parquet' + ) + + WHERE + -- OFT Docs: + -- https://docs.layerzero.network/v2/home/token-standards/oft-standard + -- + -- Example Log: + -- https://optimistic.etherscan.io/tx/0x40ddae2718940c4487af4c02d889510ab47e2e423028b76a3b00ec9bc8c04798#eventlog#21 + -- + -- Signature: + -- OFTSent ( + -- index_topic_1 bytes32 guid, + -- uint32 dstEid, + -- index_topic_2 address fromAddress, + -- uint256 amountSentLD, + -- uint256 amountReceivedLD + -- ) + topic0 = '0x85496b760a4b7f8d66384b9df21b381f5d1b1e79f229a47aaf4c232edc2fe59a' +) + +SELECT + t.dt + , t.chain + , t.chain_id + , t.network + , t.block_timestamp + , t.block_number + , t.transaction_hash + , t.transaction_index + , t.log_index + , t.contract_address + , t.amount + , t.from_address + , t.to_address + +FROM + ( + SELECT + * + FROM blockbatch.token_transfers__erc20_transfers_v1 + WHERE dt = '2025-01-01' AND chain = 'base' + ) + AS t +WHERE + (t.chain_id, t.transaction_hash, t.contract_address) IN (oft_sent_events) diff --git a/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/test_interop_queries.py b/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/test_interop_queries.py new file mode 100644 index 00000000000..876d0d1d121 --- /dev/null +++ b/tests/op_analytics/datapipeline/etl/blockbatchloaddaily/test_interop_queries.py @@ -0,0 +1,43 @@ +import os + +from op_analytics.datapipeline.etl.blockbatchloaddaily.datasets import ( + INTEROP_NTT_FIRST_SEEN, + INTEROP_OFT_FIRST_SEEN, + INTEROP_NTT_TRANSFERS, + INTEROP_OFT_TRANSFERS, + INTEROP_ERC20_FIRST_SEEN, +) + +from op_analytics.datapipeline.etl.blockbatchloaddaily.loadspec_datechain import DateChainBatch + + +def test_interop_queries(): + datasets = [ + INTEROP_NTT_FIRST_SEEN, + INTEROP_OFT_FIRST_SEEN, + INTEROP_NTT_TRANSFERS, + INTEROP_OFT_TRANSFERS, + INTEROP_ERC20_FIRST_SEEN, + ] + + for dataset in datasets: + ddl = dataset.insert_ddl_template( + batch=DateChainBatch.of(chain="base", dt="2025-01-01"), + dry_run=True, + ) + + expectation_file = os.path.join( + os.path.dirname(__file__), f"rendered/{dataset.output_root_path}.sql" + ) + + if not os.path.exists(expectation_file): + with open(expectation_file, "w") as f: + f.write(ddl) + + with open(expectation_file, "r") as f: + expected_ddl = f.read() + + if ddl != expected_ddl: + print(ddl) + + assert ddl == expected_ddl From af5a469d8b8e0df41127f39d1bab3bf63c05c995 Mon Sep 17 00:00:00 2001 From: Pedro M Duarte Date: Fri, 11 Apr 2025 16:59:20 -0400 Subject: [PATCH 2/2] Fix tests --- src/op_analytics/dagster/assets/transforms.py | 10 +++++----- .../blockbatchloaddaily/loadspec_datechain.py | 2 +- .../coreutils/clickhouse/test_read_ddl.py | 6 +++--- tests/op_analytics/transforms/test_create.py | 6 +++--- tests/op_analytics/transforms/test_execute.py | 18 +++++------------- 5 files changed, 17 insertions(+), 25 deletions(-) diff --git a/src/op_analytics/dagster/assets/transforms.py b/src/op_analytics/dagster/assets/transforms.py index 8a7ebffe6fe..490e76cae45 100644 --- a/src/op_analytics/dagster/assets/transforms.py +++ b/src/op_analytics/dagster/assets/transforms.py @@ -26,11 +26,11 @@ def interop(context: AssetExecutionContext): INTEROP_OFT_TRANSFERS, ) - daily_to_clickhouse(dataset=INTEROP_ERC20_FIRST_SEEN) - daily_to_clickhouse(dataset=INTEROP_NTT_TRANSFERS) - daily_to_clickhouse(dataset=INTEROP_OFT_TRANSFERS) - daily_to_clickhouse(dataset=INTEROP_NTT_FIRST_SEEN) - daily_to_clickhouse(dataset=INTEROP_OFT_FIRST_SEEN) + daily_to_clickhouse(dataset=INTEROP_ERC20_FIRST_SEEN, dagster_context=context) + daily_to_clickhouse(dataset=INTEROP_NTT_TRANSFERS, dagster_context=context) + daily_to_clickhouse(dataset=INTEROP_OFT_TRANSFERS, dagster_context=context) + daily_to_clickhouse(dataset=INTEROP_NTT_FIRST_SEEN, dagster_context=context) + daily_to_clickhouse(dataset=INTEROP_OFT_FIRST_SEEN, dagster_context=context) # For step 6 we need a back-dated run. What we do is detect ERC-20 create traces # for conracts that have had at least one ERC-20 transfer. If we run at the present diff --git a/src/op_analytics/datapipeline/etl/blockbatchloaddaily/loadspec_datechain.py b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/loadspec_datechain.py index aca49adc789..5bca26f6e25 100644 --- a/src/op_analytics/datapipeline/etl/blockbatchloaddaily/loadspec_datechain.py +++ b/src/op_analytics/datapipeline/etl/blockbatchloaddaily/loadspec_datechain.py @@ -28,7 +28,7 @@ class ETLMixin: def __post_init__(self): for root_path in self.inputs_blockbatch: if not (root_path.startswith("blockbatch/") or root_path.startswith("ingestion/")): - raise ValueError(f"Invalid blockbatch input: {root_path}") + raise ValueError(f"Invalid inputs_blockbatch input: {root_path}") for root_path in self.inputs_clickhouse: # Check that the patch can be sanitized. diff --git a/tests/op_analytics/coreutils/clickhouse/test_read_ddl.py b/tests/op_analytics/coreutils/clickhouse/test_read_ddl.py index d93605e9847..4d64214cea1 100644 --- a/tests/op_analytics/coreutils/clickhouse/test_read_ddl.py +++ b/tests/op_analytics/coreutils/clickhouse/test_read_ddl.py @@ -8,7 +8,7 @@ def test_read_ddls(): assert isinstance(transforms_dir, str) # Read all SQL files in the ddl directory - ddls = read_ddls(transforms_dir, "interop/update/*ntt*") + ddls = read_ddls(transforms_dir, "interop/update/*fact*") # Verify we got some DDL files back assert len(ddls) == 2 @@ -16,6 +16,6 @@ def test_read_ddls(): # Verify each DDL is a non-empty string paths = [_.basename for _ in ddls] assert paths == [ - "03_fact_erc20_ntt_transfers_v1.sql", - "04_dim_erc20_ntt_first_seen_v1.sql", + "06_fact_erc20_create_traces_v2.sql", + "07_export_fact_erc20_create_traces_v1.sql", ] diff --git a/tests/op_analytics/transforms/test_create.py b/tests/op_analytics/transforms/test_create.py index d2478ef7b16..a660d6f92f7 100644 --- a/tests/op_analytics/transforms/test_create.py +++ b/tests/op_analytics/transforms/test_create.py @@ -15,7 +15,7 @@ def test_create_statement(): globstr="*.sql", ) - assert len(ddls) == 9 + assert len(ddls) == 2 create = CreateStatement.of( group_name=group_name, @@ -24,6 +24,6 @@ def test_create_statement(): assert create == CreateStatement( db_name="transforms_interop", - table_name="dim_erc20_first_seen_v1", - statement="CREATE TABLE IF NOT EXISTS transforms_interop.dim_erc20_first_seen_v1\n(\n `chain` String,\n `chain_id` Int32,\n `contract_address` FixedString(66),\n `first_seen` DateTime,\n `row_version` Int64,\n INDEX chain_idx chain TYPE minmax GRANULARITY 1,\n INDEX contract_address_idx contract_address TYPE minmax GRANULARITY 1,\n)\nENGINE = ReplacingMergeTree(row_version)\nORDER BY (chain, contract_address)\n", + table_name="export_fact_erc20_create_traces_v1", + statement="-- Since this table is meant to be exported we use BigQuery-compatible data types.\n\nCREATE TABLE IF NOT EXISTS transforms_interop.export_fact_erc20_create_traces_v1\n(\n `dt` Date,\n `chain` String,\n `chain_id` Int32,\n `network` String,\n `block_timestamp` DateTime,\n `block_number` Int64,\n `transaction_hash` String,\n `transaction_index` Int64,\n `tr_from_address` String,\n `tx_from_address` String,\n `contract_address`String,\n `tx_to_address` String,\n `trace_address` String,\n `trace_type` String,\n `gas` String,\n `gas_used` String,\n `value` UInt256,\n `code` String,\n `call_type` String,\n `reward_type` String,\n `subtraces` Int64,\n `error` String,\n `status` Int64,\n `tx_method_id` String,\n `code_bytelength` Int64,\n `is_erc7802` Bool,\n `has_oft_events` Bool,\n `has_ntt_events` Bool,\n INDEX dt_idx dt TYPE minmax GRANULARITY 1,\n INDEX chain_idx chain TYPE minmax GRANULARITY 1,\n)\nENGINE = ReplacingMergeTree\nORDER BY (dt, chain, chain_id, network, block_number, transaction_hash, transaction_index, trace_address)\n", ) diff --git a/tests/op_analytics/transforms/test_execute.py b/tests/op_analytics/transforms/test_execute.py index aa14fc90b97..01c9ff33230 100644 --- a/tests/op_analytics/transforms/test_execute.py +++ b/tests/op_analytics/transforms/test_execute.py @@ -20,16 +20,12 @@ def test_execute_task(mock_insert: MagicMock, mock_new: MagicMock): group_name="interop", dt=datetime.date(2023, 1, 1), tables={ - "fact_erc20_oft_transfers_v1": TableStructure( - name="fact_erc20_oft_transfers_v1", - columns=[TableColumn(name="dummy", data_type="String")], - ), - "fact_erc20_ntt_transfers_v1": TableStructure( - name="fact_erc20_ntt_transfers_v1", + "fact_erc20_create_traces_v2": TableStructure( + name="fact_erc20_create_traces_v2", columns=[TableColumn(name="dummy", data_type="String")], ), }, - steps_to_run=[2, 3], + steps_to_run=[6], steps_to_skip=None, raise_if_empty=False, ) @@ -51,11 +47,7 @@ def test_execute_task(mock_insert: MagicMock, mock_new: MagicMock): { "parameters": {"dtparam": datetime.date(2023, 1, 1)}, "settings": {"use_hive_partitioning": 1}, - }, - { - "parameters": {"dtparam": datetime.date(2023, 1, 1)}, - "settings": {"use_hive_partitioning": 1}, - }, + } ] inserts = [] @@ -83,7 +75,7 @@ def test_execute_task(mock_insert: MagicMock, mock_new: MagicMock): { "transform": "interop", "dt": datetime.date(2023, 1, 1), - "metadata": '[{"name": "02_fact_erc20_oft_transfers_v1.sql", "result": {"written_rows": 100}}, {"name": "03_fact_erc20_ntt_transfers_v1.sql", "result": {"written_rows": 100}}]', + "metadata": '[{"name": "06_fact_erc20_create_traces_v2.sql", "result": {"written_rows": 100}}]', "process_name": "default", } ],