Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions src/op_analytics/dagster/assets/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
)

from op_analytics.transforms.main import execute_dt_transforms
from op_analytics.datapipeline.etl.blockbatchloaddaily.main import daily_to_clickhouse


@asset
Expand All @@ -17,15 +18,19 @@ def erc20transfers(context: AssetExecutionContext):
def interop(context: AssetExecutionContext):
"""Run interop dataset transformations."""

# Run all steps except for 6 and 7.
result = execute_dt_transforms(
group_name="interop",
force_complete=True,
range_spec="m2days",
steps_to_run=None,
steps_to_skip=[6, 7],
from op_analytics.datapipeline.etl.blockbatchloaddaily.datasets import (
INTEROP_ERC20_FIRST_SEEN,
INTEROP_NTT_FIRST_SEEN,
INTEROP_OFT_FIRST_SEEN,
INTEROP_NTT_TRANSFERS,
INTEROP_OFT_TRANSFERS,
)
context.log.info(result)

daily_to_clickhouse(dataset=INTEROP_ERC20_FIRST_SEEN, dagster_context=context)
daily_to_clickhouse(dataset=INTEROP_NTT_TRANSFERS, dagster_context=context)
daily_to_clickhouse(dataset=INTEROP_OFT_TRANSFERS, dagster_context=context)
daily_to_clickhouse(dataset=INTEROP_NTT_FIRST_SEEN, dagster_context=context)
daily_to_clickhouse(dataset=INTEROP_OFT_FIRST_SEEN, dagster_context=context)

# For step 6 we need a back-dated run. What we do is detect ERC-20 create traces
# for conracts that have had at least one ERC-20 transfer. If we run at the present
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,45 @@
output_root_path="transforms_dummy/daily_counts_v0",
inputs_clickhouse=["blockbatch_daily/aggtxs/daily_address_summary_v1"],
)


################################################################################
# Interop datasets
################################################################################

RAW_LOGS = "ingestion/logs_v1"
ERC20_TRANSFERS = "blockbatch/token_transfers/erc20_transfers_v1"


INTEROP_ERC20_FIRST_SEEN = ClickHouseDateChainETL(
output_root_path="transforms_interop/dim_erc20_first_seen_v1",
inputs_clickhouse=[ERC20_TRANSFERS],
)

INTEROP_NTT_TRANSFERS = ClickHouseDateChainETL(
output_root_path="transforms_interop/fact_erc20_ntt_transfers_v1",
inputs_blockbatch=[RAW_LOGS],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any difference between this line 101 and 107 for inputs path?

inputs_clickhouse=[ERC20_TRANSFERS],
)

INTEROP_OFT_TRANSFERS = ClickHouseDateChainETL(
output_root_path="transforms_interop/fact_erc20_oft_transfers_v1",
inputs_blockbatch=[
"ingestion/logs_v1",
],
inputs_clickhouse=[ERC20_TRANSFERS],
)

INTEROP_NTT_FIRST_SEEN = ClickHouseDateChainETL(
output_root_path="transforms_interop/dim_erc20_ntt_first_seen_v1",
inputs_clickhouse=[
INTEROP_NTT_TRANSFERS.output_root_path,
],
)

INTEROP_OFT_FIRST_SEEN = ClickHouseDateChainETL(
output_root_path="transforms_interop/dim_erc20_oft_first_seen_v1",
inputs_clickhouse=[
INTEROP_OFT_TRANSFERS.output_root_path,
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ Update first seen erc20 transfers.

*/

INSERT INTO _placeholder_

SELECT
chain
, chain_id
Expand All @@ -15,7 +13,5 @@ SELECT
, min(block_timestamp) AS first_seen
, dateDiff('second', first_seen, '2106-01-01 00:00:00'::DATETIME) AS row_version

FROM blockbatch.token_transfers__erc20_transfers_v1
WHERE
dt = {dtparam:Date} -- noqa: LT01,CP02
FROM INPUT_CLICKHOUSE('blockbatch/token_transfers/erc20_transfers_v1')
GROUP BY 1, 2, 3
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ Aggregated to keep only the first seen value for each token contract address.

*/

INSERT INTO _placeholder_


SELECT
chain
, chain_id
Expand All @@ -16,5 +13,5 @@ SELECT
, min(block_timestamp) AS first_seen
, dateDiff('second', first_seen, '2106-01-01 00:00:00'::DATETIME) AS row_version

FROM transforms_interop.fact_erc20_ntt_transfers_v1
FROM INPUT_CLICKHOUSE('transforms_interop/fact_erc20_ntt_transfers_v1')
GROUP BY 1, 2, 3
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ Aggregated to keep only the first seen value for each token contract address.

*/

INSERT INTO _placeholder_


SELECT
chain
, chain_id
Expand All @@ -16,5 +13,5 @@ SELECT
, min(block_timestamp) AS first_seen
, dateDiff('second', first_seen, '2106-01-01 00:00:00'::DATETIME) AS row_version

FROM transforms_interop.fact_erc20_oft_transfers_v1
FROM INPUT_CLICKHOUSE('transforms_interop/fact_erc20_oft_transfers_v1')
GROUP BY 1, 2, 3
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ https://optimistic.etherscan.io/tx/0x9ae78927d9771a2bcd89fc9eb467c063753dc30214d

*/

INSERT INTO _placeholder_

WITH


Expand All @@ -33,13 +31,7 @@ ntt_delivery_events AS ( -- noqa: ST03
, transaction_hash
, log_index

FROM
blockbatch_gcs.read_date(
rootpath = 'ingestion/logs_v1'
, chain = '*'
, dt = { dtparam: Date }
)

FROM INPUT_BLOCKBATCH('ingestion/logs_v1')
WHERE
--
-- Delivery(
Expand Down Expand Up @@ -71,13 +63,11 @@ SELECT
, t.from_address
, t.to_address

FROM blockbatch.token_transfers__erc20_transfers_v1 AS t

FROM INPUT_CLICKHOUSE('blockbatch/token_transfers/erc20_transfers_v1') AS t
INNER JOIN ntt_delivery_events AS n
ON
t.chain_id = n.chain_id
AND t.transaction_hash = n.transaction_hash
WHERE
t.dt = { dtparam: Date }
-- Transfer is before Delivery
AND t.log_index < n.log_index
t.log_index < n.log_index
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ conver OFTAdapter tokens.

*/

INSERT INTO _placeholder_

WITH

oft_sent_events AS ( -- noqa: ST03
Expand All @@ -19,13 +17,7 @@ oft_sent_events AS ( -- noqa: ST03
, transaction_hash
, address AS contract_address

FROM
blockbatch_gcs.read_date(
rootpath = 'ingestion/logs_v1'
, chain = '*'
, dt = { dtparam: Date }
)

FROM INPUT_BLOCKBATCH('ingestion/logs_v1')
WHERE
-- OFT Docs:
-- https://docs.layerzero.network/v2/home/token-standards/oft-standard
Expand Down Expand Up @@ -59,7 +51,6 @@ SELECT
, t.from_address
, t.to_address

FROM blockbatch.token_transfers__erc20_transfers_v1 AS t
FROM INPUT_CLICKHOUSE('blockbatch/token_transfers/erc20_transfers_v1') AS t
WHERE
t.dt = { dtparam: Date }
AND (t.chain_id, t.transaction_hash, t.contract_address) IN (oft_sent_events)
(t.chain_id, t.transaction_hash, t.contract_address) IN (oft_sent_events)
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class ETLMixin:

def __post_init__(self):
for root_path in self.inputs_blockbatch:
if not root_path.startswith("blockbatch/"):
raise ValueError(f"Invalid blockbatch input: {root_path}")
if not (root_path.startswith("blockbatch/") or root_path.startswith("ingestion/")):
raise ValueError(f"Invalid inputs_blockbatch input: {root_path}")

for root_path in self.inputs_clickhouse:
# Check that the patch can be sanitized.
Expand Down

This file was deleted.

This file was deleted.

6 changes: 3 additions & 3 deletions tests/op_analytics/coreutils/clickhouse/test_read_ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ def test_read_ddls():
assert isinstance(transforms_dir, str)

# Read all SQL files in the ddl directory
ddls = read_ddls(transforms_dir, "interop/update/*ntt*")
ddls = read_ddls(transforms_dir, "interop/update/*fact*")

# Verify we got some DDL files back
assert len(ddls) == 2

# Verify each DDL is a non-empty string
paths = [_.basename for _ in ddls]
assert paths == [
"03_fact_erc20_ntt_transfers_v1.sql",
"04_dim_erc20_ntt_first_seen_v1.sql",
"06_fact_erc20_create_traces_v2.sql",
"07_export_fact_erc20_create_traces_v1.sql",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
INSERT INTO transforms_interop.dim_erc20_first_seen_v1
/**
Update first seen erc20 transfers.
*/

SELECT
chain
, chain_id
, contract_address

-- transfers seen earlier have greater ReplacingMergeTree row_version
, min(block_timestamp) AS first_seen
, dateDiff('second', first_seen, '2106-01-01 00:00:00'::DATETIME) AS row_version

FROM
(
SELECT
*
FROM blockbatch.token_transfers__erc20_transfers_v1
WHERE dt = '2025-01-01' AND chain = 'base'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

)

GROUP BY 1, 2, 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
INSERT INTO transforms_interop.dim_erc20_ntt_first_seen_v1
/**
Aggregated to keep only the first seen value for each token contract address.
*/

SELECT
chain
, chain_id
, contract_address

-- events seen earlier have greater ReplacingMergeTree row_version
, min(block_timestamp) AS first_seen
, dateDiff('second', first_seen, '2106-01-01 00:00:00'::DATETIME) AS row_version

FROM
(
SELECT
*
FROM transforms_interop.fact_erc20_ntt_transfers_v1
WHERE dt = '2025-01-01' AND chain = 'base'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit confused on this part, but wonder why the dt and chain is set to base only on 2025-01-01?

)

GROUP BY 1, 2, 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
INSERT INTO transforms_interop.dim_erc20_oft_first_seen_v1
/**
Aggregated to keep only the first seen value for each token contract address.
*/

SELECT
chain
, chain_id
, contract_address

-- events seen earlier have greater ReplacingMergeTree row_version
, min(block_timestamp) AS first_seen
, dateDiff('second', first_seen, '2106-01-01 00:00:00'::DATETIME) AS row_version

FROM
(
SELECT
*
FROM transforms_interop.fact_erc20_oft_transfers_v1
WHERE dt = '2025-01-01' AND chain = 'base'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

)

GROUP BY 1, 2, 3
Loading