From e9bcda4430b427bee92061b3331e7832e9632116 Mon Sep 17 00:00:00 2001 From: Dennis Date: Fri, 21 Feb 2025 08:21:18 +0200 Subject: [PATCH 1/9] sugar pull poc --- src/op_analytics/dagster/assets/sugar.py | 17 +++++ .../datasources/sugar/__init__.py | 0 .../datasources/sugar/chain_list.py | 4 ++ .../datasources/sugar/chains/__init__.py | 0 .../datasources/sugar/chains/chains.py | 60 +++++++++++++++++ .../datasources/sugar/dataaccess.py | 7 ++ src/op_analytics/datasources/sugar/execute.py | 67 +++++++++++++++++++ .../datasources/sugar/pools/__init__.py | 0 .../datasources/sugar/pools/pools.py | 57 ++++++++++++++++ .../datasources/sugar/prices/__init__.py | 0 .../sugar/prices/dynamic_prices.py | 49 ++++++++++++++ 11 files changed, 261 insertions(+) create mode 100644 src/op_analytics/dagster/assets/sugar.py create mode 100644 src/op_analytics/datasources/sugar/__init__.py create mode 100644 src/op_analytics/datasources/sugar/chain_list.py create mode 100644 src/op_analytics/datasources/sugar/chains/__init__.py create mode 100644 src/op_analytics/datasources/sugar/chains/chains.py create mode 100644 src/op_analytics/datasources/sugar/dataaccess.py create mode 100644 src/op_analytics/datasources/sugar/execute.py create mode 100644 src/op_analytics/datasources/sugar/pools/__init__.py create mode 100644 src/op_analytics/datasources/sugar/pools/pools.py create mode 100644 src/op_analytics/datasources/sugar/prices/__init__.py create mode 100644 src/op_analytics/datasources/sugar/prices/dynamic_prices.py diff --git a/src/op_analytics/dagster/assets/sugar.py b/src/op_analytics/dagster/assets/sugar.py new file mode 100644 index 00000000000..83dcefab984 --- /dev/null +++ b/src/op_analytics/dagster/assets/sugar.py @@ -0,0 +1,17 @@ +from dagster import ( + OpExecutionContext, + asset, +) + + +@asset(group_name="sugar") +def sugar_daily(context: OpExecutionContext) -> None: + """Pull daily Sugar protocol data. + + Fetches and processes daily Sugar protocol metrics and stores them in our data warehouse. + The data includes key protocol metrics like TVL, volume, and other relevant statistics. + """ + from op_analytics.datasources.sugar import execute + + result = execute.execute_pull() + context.log.info("Sugar daily pull completed", result=result) diff --git a/src/op_analytics/datasources/sugar/__init__.py b/src/op_analytics/datasources/sugar/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/op_analytics/datasources/sugar/chain_list.py b/src/op_analytics/datasources/sugar/chain_list.py new file mode 100644 index 00000000000..fa3f66b9602 --- /dev/null +++ b/src/op_analytics/datasources/sugar/chain_list.py @@ -0,0 +1,4 @@ +from sugar.chains import BaseChain, OPChain + +# So far, sugar-sdk only supports BaseChain and OPChain +chain_list = [BaseChain, OPChain] diff --git a/src/op_analytics/datasources/sugar/chains/__init__.py b/src/op_analytics/datasources/sugar/chains/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/op_analytics/datasources/sugar/chains/chains.py b/src/op_analytics/datasources/sugar/chains/chains.py new file mode 100644 index 00000000000..a83694222bd --- /dev/null +++ b/src/op_analytics/datasources/sugar/chains/chains.py @@ -0,0 +1,60 @@ +from typing import Any, Dict, List, Tuple + +from op_analytics.coreutils.logger import structlog +from op_analytics.datasources.sugar.prices.dynamic_prices import fetch_prices_with_retry + +log = structlog.get_logger() + + +async def fetch_chain_data( + chain_cls: type, +) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: + """ + Fetch chain data (tokens, pools, prices) for a given chain class. + + Args: + chain_cls: A Sugar chain class (e.g. OPChain or BaseChain). + + Returns: + A tuple with three lists: + - tokens_data: List of dictionaries for tokens. + - pools_data: List of dictionaries for liquidity pools. + - prices_data: List of dictionaries for token prices. + """ + # Initialize chain instance (assumed async context manager) + async with chain_cls() as chain: + tokens = await chain.get_all_tokens(listed_only=True) + log.info(f"{chain_cls.__name__}: Fetched {len(tokens)} tokens.") + + # Build token mapping if needed + tokens_data = [ + { + "token_address": t.token_address, + "symbol": t.symbol, + "decimals": t.decimals, + "listed": t.listed, + } + for t in tokens + ] + + pools = await chain.get_pools() + log.info(f"{chain_cls.__name__}: Fetched {len(pools)} liquidity pools.") + pools_data = [ + { + "lp": p.lp, + "factory": p.factory, + "symbol": p.symbol, + "is_stable": p.is_stable, + "total_supply": p.total_supply, + "decimals": p.decimals, + "token0": p.token0.symbol if p.token0 else None, + "token1": p.token1.symbol if p.token1 else None, + "pool_fee": p.pool_fee, + } + for p in pools + ] + + prices_data = await fetch_prices_with_retry(chain, tokens, initial_batch_size=40) + log.info(f"{chain_cls.__name__}: Fetched prices for {len(prices_data)} tokens.") + + return tokens_data, pools_data, prices_data diff --git a/src/op_analytics/datasources/sugar/dataaccess.py b/src/op_analytics/datasources/sugar/dataaccess.py new file mode 100644 index 00000000000..c9f269d3273 --- /dev/null +++ b/src/op_analytics/datasources/sugar/dataaccess.py @@ -0,0 +1,7 @@ +from op_analytics.coreutils.partitioned.dailydata import DailyDataset + + +class SugarDataAccess(DailyDataset): + TOKENS = "sugar_tokens_v1" + POOLS = "sugar_liquidity_pools_v1" + PRICES = "sugar_prices_v1" diff --git a/src/op_analytics/datasources/sugar/execute.py b/src/op_analytics/datasources/sugar/execute.py new file mode 100644 index 00000000000..dffebab0016 --- /dev/null +++ b/src/op_analytics/datasources/sugar/execute.py @@ -0,0 +1,67 @@ +""" +Sugar data ingestion pipeline. + +This module pulls Sugar protocol data from all chains and writes the tokens, pools, +and prices to partitioned datasets in ClickHouse or GCS (depending on the configuration). +""" + +import polars as pl +from typing import Dict, Any, List + +from op_analytics.coreutils.logger import structlog +from op_analytics.coreutils.partitioned.dailydatautils import dt_summary +from op_analytics.datasources.sugar.dataaccess import SugarDataAccess +from op_analytics.datasources.sugar.chain_list import chain_list +from op_analytics.datasources.sugar.chains.chains import fetch_chain_data + +log = structlog.get_logger() + + +async def _collect_data() -> Dict[str, List[Dict[str, Any]]]: + """ + Collects tokens, pools, and prices data from each configured chain (OPChain, BaseChain). + Returns: + Dictionary containing three lists, keyed by "tokens", "pools", and "prices". + """ + all_data = {"tokens": [], "pools": [], "prices": []} + + for chain_cls in chain_list: + tokens, pools, prices = await fetch_chain_data(chain_cls) + all_data["tokens"].extend(tokens) + all_data["pools"].extend(pools) + all_data["prices"].extend(prices) + + return all_data + + +def _write_data( + data: List[Dict[str, Any]], + dataset: SugarDataAccess, + data_type: str, +) -> Dict[str, Any]: + """ + Writes data to the dataset and returns a summary for logging. + """ + df = pl.DataFrame(data) + dataset.write(df) + + summary = {f"{data_type}_df": dt_summary(df)} + log.info(f"Sugar {data_type} ingestion completed", summary=summary) + return summary + + +async def execute_pull() -> Dict[str, Any]: + """ + Main Sugar ingestion entrypoint. + Fetches the data from all chains, writes to configured datasets, + and returns a summary dictionary. + """ + all_data = await _collect_data() + + summary: Dict[str, Any] = {} + summary.update(_write_data(all_data["tokens"], SugarDataAccess.TOKENS, "tokens")) + summary.update(_write_data(all_data["pools"], SugarDataAccess.POOLS, "pools")) + summary.update(_write_data(all_data["prices"], SugarDataAccess.PRICES, "prices")) + + log.info("Sugar ingestion completed", summary=summary) + return summary diff --git a/src/op_analytics/datasources/sugar/pools/__init__.py b/src/op_analytics/datasources/sugar/pools/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/op_analytics/datasources/sugar/pools/pools.py b/src/op_analytics/datasources/sugar/pools/pools.py new file mode 100644 index 00000000000..dec94767528 --- /dev/null +++ b/src/op_analytics/datasources/sugar/pools/pools.py @@ -0,0 +1,57 @@ +from typing import List + +from sugar.pool import LiquidityPool +from op_analytics.coreutils.logger import structlog + +log = structlog.get_logger() + + +async def fetch_pool_data(chain) -> List[LiquidityPool]: + """ + Fetch raw pool data without calling get_prices. We build a token mapping + from get_all_tokens directly, then map them into LiquidityPool objects. + + Handles pagination with retries on out-of-gas errors by reducing batch size. + """ + pools = [] + offset = 0 + limit = chain.settings.pool_page_size + + tokens = await chain.get_all_tokens(listed_only=True) + tokens_map = {t.token_address: t for t in tokens} + + while True: + try: + pools_batch = await chain.sugar.functions.all(limit, offset).call() + pools.extend(pools_batch) + log.info( + "Fetched pool batch", + offset=offset, + batch_size=len(pools_batch), + total_pools=len(pools), + ) + if len(pools_batch) < limit: + break + offset += limit + + except Exception as exc: + error_str = str(exc) + if "out of gas" in error_str: + if limit > 1: + new_limit = max(1, limit // 2) + log.warning( + "Reducing batch size due to out of gas error", + old_size=limit, + new_size=new_limit, + ) + limit = new_limit + else: + log.error("Failed to fetch pools with minimum batch size", error=error_str) + raise + else: + log.error("Unexpected error fetching pools", error=error_str) + raise + + result = [LiquidityPool.from_tuple(p, tokens_map) for p in pools if p is not None] + log.info("Pool data fetch completed", total_pools=len(result)) + return result diff --git a/src/op_analytics/datasources/sugar/prices/__init__.py b/src/op_analytics/datasources/sugar/prices/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/op_analytics/datasources/sugar/prices/dynamic_prices.py b/src/op_analytics/datasources/sugar/prices/dynamic_prices.py new file mode 100644 index 00000000000..8a369bc7608 --- /dev/null +++ b/src/op_analytics/datasources/sugar/prices/dynamic_prices.py @@ -0,0 +1,49 @@ +from typing import List, Any + +from op_analytics.coreutils.logger import structlog + +log = structlog.get_logger() + + +async def fetch_prices_with_retry( + chain: Any, tokens: List[Any], initial_batch_size: int = 40 +) -> List[Any]: + """ + Fetch prices for a list of tokens using dynamic batch sizing. + + Retries with a reduced batch size if known errors occur. + """ + prices = [] + index = 0 + current_batch_size = initial_batch_size + + while index < len(tokens): + token_chunk = tokens[index : index + current_batch_size] + try: + batch_prices = await chain.get_prices(token_chunk) + prices.extend(batch_prices) + log.info( + "Fetched token prices", + start=index, + end=index + current_batch_size, + total=len(tokens), + ) + index += current_batch_size + except Exception as exc: + error_str = str(exc) + if "out of gas" in error_str or "0x3445e17c" in error_str: + if current_batch_size > 1: + log.warning( + "Reducing batch size due to error", + error=error_str, + old_size=current_batch_size, + new_size=current_batch_size // 2, + ) + current_batch_size = max(1, current_batch_size // 2) + else: + log.error("Skipping token due to persistent error", error=error_str) + index += 1 + else: + log.error("Unexpected error fetching prices", error=error_str) + raise + return [p for p in prices if p is not None] From 5c514ae3a4dc421fdccd1b3740bf072ebdf451bb Mon Sep 17 00:00:00 2001 From: Dennis Date: Wed, 26 Feb 2025 12:53:31 +0200 Subject: [PATCH 2/9] implement changes for the first batch of comments --- src/op_analytics/dagster/assets/sugar.py | 26 ++++--- .../datasources/sugar/chains/__init__.py | 0 .../datasources/sugar/chains/chains.py | 60 -------------- src/op_analytics/datasources/sugar/execute.py | 78 ++++++++----------- src/op_analytics/datasources/sugar/pools.py | 65 ++++++++++++++++ .../datasources/sugar/pools/__init__.py | 0 .../datasources/sugar/pools/pools.py | 57 -------------- src/op_analytics/datasources/sugar/prices.py | 42 ++++++++++ .../datasources/sugar/prices/__init__.py | 0 .../sugar/prices/dynamic_prices.py | 49 ------------ src/op_analytics/datasources/sugar/tokens.py | 43 ++++++++++ .../datasources/velodrome/sugarwrapper.py | 14 ++++ 12 files changed, 212 insertions(+), 222 deletions(-) delete mode 100644 src/op_analytics/datasources/sugar/chains/__init__.py delete mode 100644 src/op_analytics/datasources/sugar/chains/chains.py create mode 100644 src/op_analytics/datasources/sugar/pools.py delete mode 100644 src/op_analytics/datasources/sugar/pools/__init__.py delete mode 100644 src/op_analytics/datasources/sugar/pools/pools.py create mode 100644 src/op_analytics/datasources/sugar/prices.py delete mode 100644 src/op_analytics/datasources/sugar/prices/__init__.py delete mode 100644 src/op_analytics/datasources/sugar/prices/dynamic_prices.py create mode 100644 src/op_analytics/datasources/sugar/tokens.py diff --git a/src/op_analytics/dagster/assets/sugar.py b/src/op_analytics/dagster/assets/sugar.py index 83dcefab984..12e04e53f36 100644 --- a/src/op_analytics/dagster/assets/sugar.py +++ b/src/op_analytics/dagster/assets/sugar.py @@ -1,17 +1,19 @@ -from dagster import ( - OpExecutionContext, - asset, -) +from op_analytics.coreutils.partitioned.dailydata import DailyDataset -@asset(group_name="sugar") -def sugar_daily(context: OpExecutionContext) -> None: - """Pull daily Sugar protocol data. +class Sugar(DailyDataset): + """ + The Sugar dataset tracks tokens, pools, and prices from the Velodrome sugar-sdk. + See also: + - https://github.com/velodrome-finance/sugar + - https://github.com/velodrome-finance/sugar-sdk - Fetches and processes daily Sugar protocol metrics and stores them in our data warehouse. - The data includes key protocol metrics like TVL, volume, and other relevant statistics. + Tables: + - tokens_v1 + - pools_v1 + - prices_v1 """ - from op_analytics.datasources.sugar import execute - result = execute.execute_pull() - context.log.info("Sugar daily pull completed", result=result) + TOKENS = "tokens_v1" + POOLS = "pools_v1" + PRICES = "prices_v1" diff --git a/src/op_analytics/datasources/sugar/chains/__init__.py b/src/op_analytics/datasources/sugar/chains/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/op_analytics/datasources/sugar/chains/chains.py b/src/op_analytics/datasources/sugar/chains/chains.py deleted file mode 100644 index a83694222bd..00000000000 --- a/src/op_analytics/datasources/sugar/chains/chains.py +++ /dev/null @@ -1,60 +0,0 @@ -from typing import Any, Dict, List, Tuple - -from op_analytics.coreutils.logger import structlog -from op_analytics.datasources.sugar.prices.dynamic_prices import fetch_prices_with_retry - -log = structlog.get_logger() - - -async def fetch_chain_data( - chain_cls: type, -) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: - """ - Fetch chain data (tokens, pools, prices) for a given chain class. - - Args: - chain_cls: A Sugar chain class (e.g. OPChain or BaseChain). - - Returns: - A tuple with three lists: - - tokens_data: List of dictionaries for tokens. - - pools_data: List of dictionaries for liquidity pools. - - prices_data: List of dictionaries for token prices. - """ - # Initialize chain instance (assumed async context manager) - async with chain_cls() as chain: - tokens = await chain.get_all_tokens(listed_only=True) - log.info(f"{chain_cls.__name__}: Fetched {len(tokens)} tokens.") - - # Build token mapping if needed - tokens_data = [ - { - "token_address": t.token_address, - "symbol": t.symbol, - "decimals": t.decimals, - "listed": t.listed, - } - for t in tokens - ] - - pools = await chain.get_pools() - log.info(f"{chain_cls.__name__}: Fetched {len(pools)} liquidity pools.") - pools_data = [ - { - "lp": p.lp, - "factory": p.factory, - "symbol": p.symbol, - "is_stable": p.is_stable, - "total_supply": p.total_supply, - "decimals": p.decimals, - "token0": p.token0.symbol if p.token0 else None, - "token1": p.token1.symbol if p.token1 else None, - "pool_fee": p.pool_fee, - } - for p in pools - ] - - prices_data = await fetch_prices_with_retry(chain, tokens, initial_batch_size=40) - log.info(f"{chain_cls.__name__}: Fetched prices for {len(prices_data)} tokens.") - - return tokens_data, pools_data, prices_data diff --git a/src/op_analytics/datasources/sugar/execute.py b/src/op_analytics/datasources/sugar/execute.py index dffebab0016..41f5fead0d8 100644 --- a/src/op_analytics/datasources/sugar/execute.py +++ b/src/op_analytics/datasources/sugar/execute.py @@ -1,67 +1,57 @@ -""" -Sugar data ingestion pipeline. - -This module pulls Sugar protocol data from all chains and writes the tokens, pools, -and prices to partitioned datasets in ClickHouse or GCS (depending on the configuration). -""" - +from sugar.chains import BaseChain, OPChain import polars as pl -from typing import Dict, Any, List - from op_analytics.coreutils.logger import structlog from op_analytics.coreutils.partitioned.dailydatautils import dt_summary -from op_analytics.datasources.sugar.dataaccess import SugarDataAccess -from op_analytics.datasources.sugar.chain_list import chain_list -from op_analytics.datasources.sugar.chains.chains import fetch_chain_data +from op_analytics.dagster.assets.sugar import Sugar +from op_analytics.datasources.sugar.tokens import fetch_tokens_for_chain +from op_analytics.datasources.sugar.pools import fetch_pools_for_chain +from op_analytics.datasources.sugar.prices import fetch_prices_for_chain log = structlog.get_logger() +CHAIN_LIST = [OPChain, BaseChain] -async def _collect_data() -> Dict[str, List[Dict[str, Any]]]: + +def _collect_data() -> dict[str, list[pl.DataFrame]]: """ - Collects tokens, pools, and prices data from each configured chain (OPChain, BaseChain). - Returns: - Dictionary containing three lists, keyed by "tokens", "pools", and "prices". + Collects tokens, pools, and prices data from each chain in CHAIN_LIST, + by leveraging sugarwrapper.py calls indirectly via sugar.tokens, sugar.pools, sugar.prices. + Returns lists of DataFrames for each data type. """ all_data = {"tokens": [], "pools": [], "prices": []} - for chain_cls in chain_list: - tokens, pools, prices = await fetch_chain_data(chain_cls) - all_data["tokens"].extend(tokens) - all_data["pools"].extend(pools) - all_data["prices"].extend(prices) + for chain_cls in CHAIN_LIST: + chain_name = chain_cls.__name__ + log.info(f"Fetching Sugar data for chain={chain_name}") + + chain_tokens_df = fetch_tokens_for_chain(chain_cls) + chain_pools_df = fetch_pools_for_chain(chain_cls) + chain_prices_df = fetch_prices_for_chain(chain_cls) + + all_data["tokens"].append(chain_tokens_df) + all_data["pools"].append(chain_pools_df) + all_data["prices"].append(chain_prices_df) return all_data -def _write_data( - data: List[Dict[str, Any]], - dataset: SugarDataAccess, - data_type: str, -) -> Dict[str, Any]: +def execute_pull() -> dict[str, dict]: """ - Writes data to the dataset and returns a summary for logging. + Main Sugar ingestion entrypoint. Fetches tokens, pools, and prices + for each chain in CHAIN_LIST, then writes them to the Sugar dataset. + Returns a summary of counts for logging. """ - df = pl.DataFrame(data) - dataset.write(df) - - summary = {f"{data_type}_df": dt_summary(df)} - log.info(f"Sugar {data_type} ingestion completed", summary=summary) - return summary + collected = _collect_data() + summary = {} + Sugar.write_dataset(collected["tokens"], Sugar.TOKENS) + summary["tokens_df"] = dt_summary(collected["tokens"]) -async def execute_pull() -> Dict[str, Any]: - """ - Main Sugar ingestion entrypoint. - Fetches the data from all chains, writes to configured datasets, - and returns a summary dictionary. - """ - all_data = await _collect_data() + Sugar.write_dataset(collected["pools"], Sugar.POOLS) + summary["pools_df"] = dt_summary(collected["pools"]) - summary: Dict[str, Any] = {} - summary.update(_write_data(all_data["tokens"], SugarDataAccess.TOKENS, "tokens")) - summary.update(_write_data(all_data["pools"], SugarDataAccess.POOLS, "pools")) - summary.update(_write_data(all_data["prices"], SugarDataAccess.PRICES, "prices")) + Sugar.write_dataset(collected["prices"], Sugar.PRICES) + summary["prices_df"] = dt_summary(collected["prices"]) log.info("Sugar ingestion completed", summary=summary) return summary diff --git a/src/op_analytics/datasources/sugar/pools.py b/src/op_analytics/datasources/sugar/pools.py new file mode 100644 index 00000000000..e7a06dd39fc --- /dev/null +++ b/src/op_analytics/datasources/sugar/pools.py @@ -0,0 +1,65 @@ +from op_analytics.coreutils.logger import structlog +import polars as pl +from sugar.pool import LiquidityPool + +from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools, chain_cls_to_str + +log = structlog.get_logger() + +POOLS_SCHEMA = { + "chain": pl.Utf8, + "lp": pl.Utf8, + "factory": pl.Utf8, + "symbol": pl.Utf8, + "is_stable": pl.Boolean, + "total_supply": pl.Float64, + "decimals": pl.Int64, + "token0": pl.Utf8, + "token1": pl.Utf8, + "pool_fee": pl.Float64, + "gauge_total_supply": pl.Float64, + "emissions_token": pl.Utf8, + "nfpm": pl.Utf8, + "alm": pl.Utf8, +} + + +def fetch_pools_for_chain(chain_cls: type) -> pl.DataFrame: + """ + Fetch pool data from sugarwrapper.py. We then map the returned LiquidityPool objects + into a Polars DataFrame defined by POOLS_SCHEMA. + """ + chain_str = chain_cls_to_str(chain_cls) + log.info(f"Fetching pools for {chain_str}") + + velodrome_pools = fetch_pools(chain_str) + raw_pools = velodrome_pools.pools # list[LiquidityPool] + + pool_records = [] + for lp in raw_pools: + if not isinstance(lp, LiquidityPool): + continue + pool_records.append( + { + "chain": chain_str, + "lp": lp.lp, + "symbol": lp.symbol, + "factory": lp.factory, + "is_stable": lp.is_stable, + "total_supply": float(lp.total_supply), + "decimals": int(lp.decimals), + "token0": lp.token0.token_address if lp.token0 else "unknown", + "token1": lp.token1.token_address if lp.token1 else "unknown", + "pool_fee": float(lp.pool_fee), + "gauge_total_supply": float(lp.gauge_total_supply), + "emissions_token": lp.emissions_token.token_address + if lp.emissions_token + else "unknown", + "nfpm": lp.nfpm, + "alm": lp.alm, + } + ) + + df = pl.DataFrame(pool_records, schema=POOLS_SCHEMA) + log.info(f"{chain_str} returned {df.height} pools.") + return df diff --git a/src/op_analytics/datasources/sugar/pools/__init__.py b/src/op_analytics/datasources/sugar/pools/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/op_analytics/datasources/sugar/pools/pools.py b/src/op_analytics/datasources/sugar/pools/pools.py deleted file mode 100644 index dec94767528..00000000000 --- a/src/op_analytics/datasources/sugar/pools/pools.py +++ /dev/null @@ -1,57 +0,0 @@ -from typing import List - -from sugar.pool import LiquidityPool -from op_analytics.coreutils.logger import structlog - -log = structlog.get_logger() - - -async def fetch_pool_data(chain) -> List[LiquidityPool]: - """ - Fetch raw pool data without calling get_prices. We build a token mapping - from get_all_tokens directly, then map them into LiquidityPool objects. - - Handles pagination with retries on out-of-gas errors by reducing batch size. - """ - pools = [] - offset = 0 - limit = chain.settings.pool_page_size - - tokens = await chain.get_all_tokens(listed_only=True) - tokens_map = {t.token_address: t for t in tokens} - - while True: - try: - pools_batch = await chain.sugar.functions.all(limit, offset).call() - pools.extend(pools_batch) - log.info( - "Fetched pool batch", - offset=offset, - batch_size=len(pools_batch), - total_pools=len(pools), - ) - if len(pools_batch) < limit: - break - offset += limit - - except Exception as exc: - error_str = str(exc) - if "out of gas" in error_str: - if limit > 1: - new_limit = max(1, limit // 2) - log.warning( - "Reducing batch size due to out of gas error", - old_size=limit, - new_size=new_limit, - ) - limit = new_limit - else: - log.error("Failed to fetch pools with minimum batch size", error=error_str) - raise - else: - log.error("Unexpected error fetching pools", error=error_str) - raise - - result = [LiquidityPool.from_tuple(p, tokens_map) for p in pools if p is not None] - log.info("Pool data fetch completed", total_pools=len(result)) - return result diff --git a/src/op_analytics/datasources/sugar/prices.py b/src/op_analytics/datasources/sugar/prices.py new file mode 100644 index 00000000000..a6f51492993 --- /dev/null +++ b/src/op_analytics/datasources/sugar/prices.py @@ -0,0 +1,42 @@ +from op_analytics.coreutils.logger import structlog +import polars as pl + +from sugar.price import Price + +from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools, chain_cls_to_str + +log = structlog.get_logger() + +PRICES_SCHEMA = { + "chain": pl.Utf8, + "token_address": pl.Utf8, + "price": pl.Float64, +} + + +def fetch_prices_for_chain(chain_cls: type) -> pl.DataFrame: + """ + Fetch token prices for each chain by leveraging sugarwrapper.py's fetch_pools(). + We then build a DataFrame from the returned Price objects. + """ + chain_str = chain_cls_to_str(chain_cls) + log.info(f"Fetching prices for {chain_str} via sugarwrapper fetch_pools()") + + velodrome_pools = fetch_pools(chain_str) + prices = velodrome_pools.prices # list[Price] + + price_records = [] + for cp in prices: + if not isinstance(cp, Price): + continue + price_records.append( + { + "chain": chain_str, + "token_address": cp.token.token_address, + "price": cp.price, + } + ) + + df = pl.DataFrame(price_records, schema=PRICES_SCHEMA) + log.info(f"{chain_str} returned {df.height} prices.") + return df diff --git a/src/op_analytics/datasources/sugar/prices/__init__.py b/src/op_analytics/datasources/sugar/prices/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/op_analytics/datasources/sugar/prices/dynamic_prices.py b/src/op_analytics/datasources/sugar/prices/dynamic_prices.py deleted file mode 100644 index 8a369bc7608..00000000000 --- a/src/op_analytics/datasources/sugar/prices/dynamic_prices.py +++ /dev/null @@ -1,49 +0,0 @@ -from typing import List, Any - -from op_analytics.coreutils.logger import structlog - -log = structlog.get_logger() - - -async def fetch_prices_with_retry( - chain: Any, tokens: List[Any], initial_batch_size: int = 40 -) -> List[Any]: - """ - Fetch prices for a list of tokens using dynamic batch sizing. - - Retries with a reduced batch size if known errors occur. - """ - prices = [] - index = 0 - current_batch_size = initial_batch_size - - while index < len(tokens): - token_chunk = tokens[index : index + current_batch_size] - try: - batch_prices = await chain.get_prices(token_chunk) - prices.extend(batch_prices) - log.info( - "Fetched token prices", - start=index, - end=index + current_batch_size, - total=len(tokens), - ) - index += current_batch_size - except Exception as exc: - error_str = str(exc) - if "out of gas" in error_str or "0x3445e17c" in error_str: - if current_batch_size > 1: - log.warning( - "Reducing batch size due to error", - error=error_str, - old_size=current_batch_size, - new_size=current_batch_size // 2, - ) - current_batch_size = max(1, current_batch_size // 2) - else: - log.error("Skipping token due to persistent error", error=error_str) - index += 1 - else: - log.error("Unexpected error fetching prices", error=error_str) - raise - return [p for p in prices if p is not None] diff --git a/src/op_analytics/datasources/sugar/tokens.py b/src/op_analytics/datasources/sugar/tokens.py new file mode 100644 index 00000000000..bc1968e1f12 --- /dev/null +++ b/src/op_analytics/datasources/sugar/tokens.py @@ -0,0 +1,43 @@ +from op_analytics.coreutils.logger import structlog +import polars as pl + +from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools, chain_cls_to_str + +log = structlog.get_logger() + +TOKEN_SCHEMA = { + "chain": pl.Utf8, + "token_address": pl.Utf8, + "symbol": pl.Utf8, + "decimals": pl.Int64, + "listed": pl.Boolean, +} + + +def fetch_tokens_for_chain(chain_cls: type) -> pl.DataFrame: + """ + Fetch token metadata for a single chain by leveraging sugarwrapper.py's fetch_pools(). + Returns a Polars DataFrame matching TOKEN_SCHEMA. + """ + chain_str = chain_cls_to_str(chain_cls) + log.info(f"Fetching tokens for {chain_str} via sugarwrapper fetch_pools()") + + # fetch_pools() returns a VelodromePools dataclass; we only need tokens here + velodrome_pools = fetch_pools(chain_str) + tokens = velodrome_pools.tokens # a list of sugar.token.Token objects + + token_records = [] + for t in tokens: + token_records.append( + { + "chain": chain_str, + "token_address": t.token_address, + "symbol": t.symbol, + "decimals": t.decimals, + "listed": t.listed, # if the Token object has a 'listed' attribute + } + ) + + df = pl.DataFrame(token_records, schema=TOKEN_SCHEMA) + log.info(f"{chain_str} returned {df.height} tokens.") + return df diff --git a/src/op_analytics/datasources/velodrome/sugarwrapper.py b/src/op_analytics/datasources/velodrome/sugarwrapper.py index c469cf8941f..b7d2494b5eb 100644 --- a/src/op_analytics/datasources/velodrome/sugarwrapper.py +++ b/src/op_analytics/datasources/velodrome/sugarwrapper.py @@ -14,6 +14,20 @@ log = structlog.get_logger() +def chain_cls_to_str(chain_cls: type) -> str: + """ + Convert a chain class like OPChain → 'op' or BaseChain → 'base'. + Extend this if you support more chains in future. + """ + name = chain_cls.__name__.lower() + if "opchain" in name: + return "op" + elif "basechain" in name: + return "base" + # Fallback or raise an error for unhandled cases + raise ValueError(f"Unrecognized chain class: {chain_cls.__name__}") + + # Map our chain names to the sugar sdk Chain class. SUGAR_CHAINS = { "op": OPChain(), From 9833369c63e88038043d79f8a9cf0d1a9458af2b Mon Sep 17 00:00:00 2001 From: Dennis Date: Wed, 26 Feb 2025 13:04:59 +0200 Subject: [PATCH 3/9] Support the new sugar-sdk version --- .../datasources/sugar/chain_list.py | 5 ++--- src/op_analytics/datasources/sugar/execute.py | 20 +++++++++---------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/op_analytics/datasources/sugar/chain_list.py b/src/op_analytics/datasources/sugar/chain_list.py index fa3f66b9602..8dcbb3f2ac9 100644 --- a/src/op_analytics/datasources/sugar/chain_list.py +++ b/src/op_analytics/datasources/sugar/chain_list.py @@ -1,4 +1,3 @@ -from sugar.chains import BaseChain, OPChain +from sugar.chains import AsyncBaseChain, AsyncOPChain -# So far, sugar-sdk only supports BaseChain and OPChain -chain_list = [BaseChain, OPChain] +chain_list = [AsyncBaseChain, AsyncOPChain] diff --git a/src/op_analytics/datasources/sugar/execute.py b/src/op_analytics/datasources/sugar/execute.py index 41f5fead0d8..627d04dffd4 100644 --- a/src/op_analytics/datasources/sugar/execute.py +++ b/src/op_analytics/datasources/sugar/execute.py @@ -1,28 +1,29 @@ -from sugar.chains import BaseChain, OPChain import polars as pl + +from sugar.chains import AsyncBaseChain, AsyncOPChain + from op_analytics.coreutils.logger import structlog from op_analytics.coreutils.partitioned.dailydatautils import dt_summary from op_analytics.dagster.assets.sugar import Sugar -from op_analytics.datasources.sugar.tokens import fetch_tokens_for_chain from op_analytics.datasources.sugar.pools import fetch_pools_for_chain from op_analytics.datasources.sugar.prices import fetch_prices_for_chain +from op_analytics.datasources.sugar.tokens import fetch_tokens_for_chain log = structlog.get_logger() -CHAIN_LIST = [OPChain, BaseChain] +CHAIN_LIST = [AsyncBaseChain, AsyncOPChain] def _collect_data() -> dict[str, list[pl.DataFrame]]: """ - Collects tokens, pools, and prices data from each chain in CHAIN_LIST, - by leveraging sugarwrapper.py calls indirectly via sugar.tokens, sugar.pools, sugar.prices. - Returns lists of DataFrames for each data type. + Gather tokens, pools, and prices DataFrames from each chain in CHAIN_LIST. + Returns a dict mapping "tokens", "pools", and "prices" to lists of DataFrames. """ all_data = {"tokens": [], "pools": [], "prices": []} for chain_cls in CHAIN_LIST: chain_name = chain_cls.__name__ - log.info(f"Fetching Sugar data for chain={chain_name}") + log.info("Fetching Sugar data", chain=chain_name) chain_tokens_df = fetch_tokens_for_chain(chain_cls) chain_pools_df = fetch_pools_for_chain(chain_cls) @@ -37,9 +38,8 @@ def _collect_data() -> dict[str, list[pl.DataFrame]]: def execute_pull() -> dict[str, dict]: """ - Main Sugar ingestion entrypoint. Fetches tokens, pools, and prices - for each chain in CHAIN_LIST, then writes them to the Sugar dataset. - Returns a summary of counts for logging. + Main entry point for Sugar ingestion logic. Fetches tokens, pools, and prices for each chain, + then writes them into Sugar partitions. """ collected = _collect_data() summary = {} From 317824079ac1796cf8189840591e6cbdeb427faf Mon Sep 17 00:00:00 2001 From: Dennis Date: Wed, 26 Feb 2025 13:42:05 +0200 Subject: [PATCH 4/9] update to support the new version --- pyproject.toml | 2 +- .../datasources/sugar/chain_list.py | 21 ++++++++++++++++++- src/op_analytics/datasources/sugar/execute.py | 11 ++++------ src/op_analytics/datasources/sugar/pools.py | 3 ++- src/op_analytics/datasources/sugar/prices.py | 3 ++- src/op_analytics/datasources/sugar/tokens.py | 3 ++- 6 files changed, 31 insertions(+), 12 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6561f40d514..874169a0c2a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,7 +69,7 @@ dev-dependencies = [ ] [tool.uv.sources] -sugar = { git = "https://github.com/velodrome-finance/sugar-sdk" } +sugar = { git = "https://github.com/velodrome-finance/sugar-sdk", branch = "main" } [tool.ruff] # Allow lines to be as long as 100. diff --git a/src/op_analytics/datasources/sugar/chain_list.py b/src/op_analytics/datasources/sugar/chain_list.py index 8dcbb3f2ac9..28527ca7cdf 100644 --- a/src/op_analytics/datasources/sugar/chain_list.py +++ b/src/op_analytics/datasources/sugar/chain_list.py @@ -1,3 +1,22 @@ from sugar.chains import AsyncBaseChain, AsyncOPChain -chain_list = [AsyncBaseChain, AsyncOPChain] + +def chain_cls_to_str(chain_cls: type) -> str: + """ + Convert a chain class like OPChain → 'op' or BaseChain → 'base'. + Extend this if you support more chains in future. + """ + name = chain_cls.__name__.lower() + if "opchain" in name: + return "op" + elif "basechain" in name: + return "base" + # Fallback or raise an error for unhandled cases + raise ValueError(f"Unrecognized chain class: {chain_cls.__name__}") + + +# Map our chain names to the sugar sdk Chain class. +SUGAR_CHAINS = { + "op": AsyncBaseChain(), + "base": AsyncOPChain(), +} diff --git a/src/op_analytics/datasources/sugar/execute.py b/src/op_analytics/datasources/sugar/execute.py index 627d04dffd4..f013f259f8b 100644 --- a/src/op_analytics/datasources/sugar/execute.py +++ b/src/op_analytics/datasources/sugar/execute.py @@ -1,30 +1,27 @@ import polars as pl -from sugar.chains import AsyncBaseChain, AsyncOPChain - from op_analytics.coreutils.logger import structlog from op_analytics.coreutils.partitioned.dailydatautils import dt_summary from op_analytics.dagster.assets.sugar import Sugar +from op_analytics.datasources.sugar.chain_list import SUGAR_CHAINS from op_analytics.datasources.sugar.pools import fetch_pools_for_chain from op_analytics.datasources.sugar.prices import fetch_prices_for_chain from op_analytics.datasources.sugar.tokens import fetch_tokens_for_chain log = structlog.get_logger() -CHAIN_LIST = [AsyncBaseChain, AsyncOPChain] - def _collect_data() -> dict[str, list[pl.DataFrame]]: """ - Gather tokens, pools, and prices DataFrames from each chain in CHAIN_LIST. + Gather tokens, pools, and prices DataFrames for each chain in SUGAR_CHAINS. Returns a dict mapping "tokens", "pools", and "prices" to lists of DataFrames. """ all_data = {"tokens": [], "pools": [], "prices": []} - for chain_cls in CHAIN_LIST: - chain_name = chain_cls.__name__ + for chain_name, chain_instance in SUGAR_CHAINS.items(): log.info("Fetching Sugar data", chain=chain_name) + chain_cls = type(chain_instance) chain_tokens_df = fetch_tokens_for_chain(chain_cls) chain_pools_df = fetch_pools_for_chain(chain_cls) chain_prices_df = fetch_prices_for_chain(chain_cls) diff --git a/src/op_analytics/datasources/sugar/pools.py b/src/op_analytics/datasources/sugar/pools.py index e7a06dd39fc..af30daf4e40 100644 --- a/src/op_analytics/datasources/sugar/pools.py +++ b/src/op_analytics/datasources/sugar/pools.py @@ -2,7 +2,8 @@ import polars as pl from sugar.pool import LiquidityPool -from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools, chain_cls_to_str +from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools +from op_analytics.datasources.sugar.chain_list import chain_cls_to_str log = structlog.get_logger() diff --git a/src/op_analytics/datasources/sugar/prices.py b/src/op_analytics/datasources/sugar/prices.py index a6f51492993..ff95254a07c 100644 --- a/src/op_analytics/datasources/sugar/prices.py +++ b/src/op_analytics/datasources/sugar/prices.py @@ -3,7 +3,8 @@ from sugar.price import Price -from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools, chain_cls_to_str +from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools +from op_analytics.datasources.sugar.chain_list import chain_cls_to_str log = structlog.get_logger() diff --git a/src/op_analytics/datasources/sugar/tokens.py b/src/op_analytics/datasources/sugar/tokens.py index bc1968e1f12..3c9aaf5095c 100644 --- a/src/op_analytics/datasources/sugar/tokens.py +++ b/src/op_analytics/datasources/sugar/tokens.py @@ -1,7 +1,8 @@ from op_analytics.coreutils.logger import structlog import polars as pl -from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools, chain_cls_to_str +from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools +from op_analytics.datasources.sugar.chain_list import chain_cls_to_str log = structlog.get_logger() From 21d3e765f9fee86fbed6a2db73f5f25218295698 Mon Sep 17 00:00:00 2001 From: Dennis Date: Wed, 26 Feb 2025 13:44:30 +0200 Subject: [PATCH 5/9] rename the sugar class and tables --- src/op_analytics/datasources/sugar/dataaccess.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/op_analytics/datasources/sugar/dataaccess.py b/src/op_analytics/datasources/sugar/dataaccess.py index c9f269d3273..fb4ae6e99ca 100644 --- a/src/op_analytics/datasources/sugar/dataaccess.py +++ b/src/op_analytics/datasources/sugar/dataaccess.py @@ -1,7 +1,7 @@ from op_analytics.coreutils.partitioned.dailydata import DailyDataset -class SugarDataAccess(DailyDataset): - TOKENS = "sugar_tokens_v1" - POOLS = "sugar_liquidity_pools_v1" - PRICES = "sugar_prices_v1" +class Sugar(DailyDataset): + TOKENS = "tokens_v1" + POOLS = "liquidity_pools_v1" + PRICES = "prices_v1" From 0827f02e1e2a4c7f8ca6c796e9f6760c35ee8546 Mon Sep 17 00:00:00 2001 From: Dennis Date: Wed, 26 Feb 2025 13:49:26 +0200 Subject: [PATCH 6/9] update sugarwrapper --- .../datasources/velodrome/sugarwrapper.py | 27 +++---------------- 1 file changed, 3 insertions(+), 24 deletions(-) diff --git a/src/op_analytics/datasources/velodrome/sugarwrapper.py b/src/op_analytics/datasources/velodrome/sugarwrapper.py index b7d2494b5eb..81c77ba67d6 100644 --- a/src/op_analytics/datasources/velodrome/sugarwrapper.py +++ b/src/op_analytics/datasources/velodrome/sugarwrapper.py @@ -1,7 +1,7 @@ import itertools from dataclasses import dataclass -from sugar.chains import BaseChain, OPChain, Chain +from sugar.chains import Chain from sugar.price import Price from sugar.token import Token from sugar.pool import LiquidityPool @@ -9,32 +9,11 @@ from op_analytics.coreutils.coroutines import run_coroutine from op_analytics.coreutils.logger import structlog - +from op_analytics.datasources.sugar.chain_list import SUGAR_CHAINS log = structlog.get_logger() -def chain_cls_to_str(chain_cls: type) -> str: - """ - Convert a chain class like OPChain → 'op' or BaseChain → 'base'. - Extend this if you support more chains in future. - """ - name = chain_cls.__name__.lower() - if "opchain" in name: - return "op" - elif "basechain" in name: - return "base" - # Fallback or raise an error for unhandled cases - raise ValueError(f"Unrecognized chain class: {chain_cls.__name__}") - - -# Map our chain names to the sugar sdk Chain class. -SUGAR_CHAINS = { - "op": OPChain(), - "base": BaseChain(), -} - - @dataclass(frozen=True) class MissingTokenInfo: """A LiquidityPool that could not be instantiated due to missing token information.""" @@ -134,7 +113,7 @@ async def _sugar_pools(chain: str, sugar_chain: Chain) -> VelodromePools: lps = [] missing = [] for pool in pools: - lp = LiquidityPool.from_tuple(pool, tokens_index) + lp = LiquidityPool.from_tuple(pool, tokens_index, prices) if lp is None: missing.append( From 1c677ea282197152ff1953dfda81bd17b7a1ccfa Mon Sep 17 00:00:00 2001 From: Dennis Date: Wed, 26 Feb 2025 14:14:34 +0200 Subject: [PATCH 7/9] done goofed up --- src/op_analytics/datasources/sugar/chain_list.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/op_analytics/datasources/sugar/chain_list.py b/src/op_analytics/datasources/sugar/chain_list.py index 28527ca7cdf..be1c86743b7 100644 --- a/src/op_analytics/datasources/sugar/chain_list.py +++ b/src/op_analytics/datasources/sugar/chain_list.py @@ -1,4 +1,4 @@ -from sugar.chains import AsyncBaseChain, AsyncOPChain +from sugar.chains import AsyncOPChain, AsyncBaseChain def chain_cls_to_str(chain_cls: type) -> str: @@ -16,7 +16,4 @@ def chain_cls_to_str(chain_cls: type) -> str: # Map our chain names to the sugar sdk Chain class. -SUGAR_CHAINS = { - "op": AsyncBaseChain(), - "base": AsyncOPChain(), -} +SUGAR_CHAINS = {"op": AsyncOPChain(), "base": AsyncBaseChain()} From 4dfe11483c9aad1a77cd12b600fdbb266e1c6105 Mon Sep 17 00:00:00 2001 From: Dennis Date: Wed, 26 Feb 2025 14:48:15 +0200 Subject: [PATCH 8/9] Rename and refactor --- .../dagster/assets/{sugar.py => velodrome.py} | 2 +- src/op_analytics/datasources/sugar/__init__.py | 0 .../datasources/{sugar => velodrome}/chain_list.py | 0 .../datasources/{sugar => velodrome}/dataaccess.py | 2 +- .../datasources/{sugar => velodrome}/execute.py | 9 ++++----- .../datasources/{sugar => velodrome}/pools.py | 2 +- .../datasources/{sugar => velodrome}/prices.py | 2 +- src/op_analytics/datasources/velodrome/sugarwrapper.py | 2 +- .../datasources/{sugar => velodrome}/tokens.py | 2 +- 9 files changed, 10 insertions(+), 11 deletions(-) rename src/op_analytics/dagster/assets/{sugar.py => velodrome.py} (93%) delete mode 100644 src/op_analytics/datasources/sugar/__init__.py rename src/op_analytics/datasources/{sugar => velodrome}/chain_list.py (100%) rename src/op_analytics/datasources/{sugar => velodrome}/dataaccess.py (83%) rename src/op_analytics/datasources/{sugar => velodrome}/execute.py (83%) rename src/op_analytics/datasources/{sugar => velodrome}/pools.py (96%) rename src/op_analytics/datasources/{sugar => velodrome}/prices.py (94%) rename src/op_analytics/datasources/{sugar => velodrome}/tokens.py (94%) diff --git a/src/op_analytics/dagster/assets/sugar.py b/src/op_analytics/dagster/assets/velodrome.py similarity index 93% rename from src/op_analytics/dagster/assets/sugar.py rename to src/op_analytics/dagster/assets/velodrome.py index 12e04e53f36..825a3115a5f 100644 --- a/src/op_analytics/dagster/assets/sugar.py +++ b/src/op_analytics/dagster/assets/velodrome.py @@ -1,7 +1,7 @@ from op_analytics.coreutils.partitioned.dailydata import DailyDataset -class Sugar(DailyDataset): +class Velodrome(DailyDataset): """ The Sugar dataset tracks tokens, pools, and prices from the Velodrome sugar-sdk. See also: diff --git a/src/op_analytics/datasources/sugar/__init__.py b/src/op_analytics/datasources/sugar/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/op_analytics/datasources/sugar/chain_list.py b/src/op_analytics/datasources/velodrome/chain_list.py similarity index 100% rename from src/op_analytics/datasources/sugar/chain_list.py rename to src/op_analytics/datasources/velodrome/chain_list.py diff --git a/src/op_analytics/datasources/sugar/dataaccess.py b/src/op_analytics/datasources/velodrome/dataaccess.py similarity index 83% rename from src/op_analytics/datasources/sugar/dataaccess.py rename to src/op_analytics/datasources/velodrome/dataaccess.py index fb4ae6e99ca..4f8071ad2d5 100644 --- a/src/op_analytics/datasources/sugar/dataaccess.py +++ b/src/op_analytics/datasources/velodrome/dataaccess.py @@ -1,7 +1,7 @@ from op_analytics.coreutils.partitioned.dailydata import DailyDataset -class Sugar(DailyDataset): +class Velodrome(DailyDataset): TOKENS = "tokens_v1" POOLS = "liquidity_pools_v1" PRICES = "prices_v1" diff --git a/src/op_analytics/datasources/sugar/execute.py b/src/op_analytics/datasources/velodrome/execute.py similarity index 83% rename from src/op_analytics/datasources/sugar/execute.py rename to src/op_analytics/datasources/velodrome/execute.py index f013f259f8b..56780af6879 100644 --- a/src/op_analytics/datasources/sugar/execute.py +++ b/src/op_analytics/datasources/velodrome/execute.py @@ -2,11 +2,10 @@ from op_analytics.coreutils.logger import structlog from op_analytics.coreutils.partitioned.dailydatautils import dt_summary -from op_analytics.dagster.assets.sugar import Sugar -from op_analytics.datasources.sugar.chain_list import SUGAR_CHAINS -from op_analytics.datasources.sugar.pools import fetch_pools_for_chain -from op_analytics.datasources.sugar.prices import fetch_prices_for_chain -from op_analytics.datasources.sugar.tokens import fetch_tokens_for_chain +from op_analytics.datasources.velodrome.chain_list import SUGAR_CHAINS +from op_analytics.datasources.velodrome.pools import fetch_pools_for_chain +from op_analytics.datasources.velodrome.prices import fetch_prices_for_chain +from op_analytics.datasources.velodrome.tokens import fetch_tokens_for_chain log = structlog.get_logger() diff --git a/src/op_analytics/datasources/sugar/pools.py b/src/op_analytics/datasources/velodrome/pools.py similarity index 96% rename from src/op_analytics/datasources/sugar/pools.py rename to src/op_analytics/datasources/velodrome/pools.py index af30daf4e40..1034f3239c1 100644 --- a/src/op_analytics/datasources/sugar/pools.py +++ b/src/op_analytics/datasources/velodrome/pools.py @@ -3,7 +3,7 @@ from sugar.pool import LiquidityPool from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools -from op_analytics.datasources.sugar.chain_list import chain_cls_to_str +from op_analytics.datasources.velodrome.chain_list import chain_cls_to_str log = structlog.get_logger() diff --git a/src/op_analytics/datasources/sugar/prices.py b/src/op_analytics/datasources/velodrome/prices.py similarity index 94% rename from src/op_analytics/datasources/sugar/prices.py rename to src/op_analytics/datasources/velodrome/prices.py index ff95254a07c..d72a1ebcf7b 100644 --- a/src/op_analytics/datasources/sugar/prices.py +++ b/src/op_analytics/datasources/velodrome/prices.py @@ -4,7 +4,7 @@ from sugar.price import Price from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools -from op_analytics.datasources.sugar.chain_list import chain_cls_to_str +from op_analytics.datasources.velodrome.chain_list import chain_cls_to_str log = structlog.get_logger() diff --git a/src/op_analytics/datasources/velodrome/sugarwrapper.py b/src/op_analytics/datasources/velodrome/sugarwrapper.py index 81c77ba67d6..2adebb24964 100644 --- a/src/op_analytics/datasources/velodrome/sugarwrapper.py +++ b/src/op_analytics/datasources/velodrome/sugarwrapper.py @@ -9,7 +9,7 @@ from op_analytics.coreutils.coroutines import run_coroutine from op_analytics.coreutils.logger import structlog -from op_analytics.datasources.sugar.chain_list import SUGAR_CHAINS +from op_analytics.datasources.velodrome.chain_list import SUGAR_CHAINS log = structlog.get_logger() diff --git a/src/op_analytics/datasources/sugar/tokens.py b/src/op_analytics/datasources/velodrome/tokens.py similarity index 94% rename from src/op_analytics/datasources/sugar/tokens.py rename to src/op_analytics/datasources/velodrome/tokens.py index 3c9aaf5095c..3a835c0bb14 100644 --- a/src/op_analytics/datasources/sugar/tokens.py +++ b/src/op_analytics/datasources/velodrome/tokens.py @@ -2,7 +2,7 @@ import polars as pl from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools -from op_analytics.datasources.sugar.chain_list import chain_cls_to_str +from op_analytics.datasources.velodrome.chain_list import chain_cls_to_str log = structlog.get_logger() From 6ff6dd099fa20b53d1616d53c70b6e464d971a32 Mon Sep 17 00:00:00 2001 From: Dennis Date: Mon, 31 Mar 2025 16:18:44 +0300 Subject: [PATCH 9/9] finalize implementation --- .../datasources/velodrome/chain_list.py | 5 ++++- .../datasources/velodrome/execute.py | 22 +++++++++++++------ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/op_analytics/datasources/velodrome/chain_list.py b/src/op_analytics/datasources/velodrome/chain_list.py index be1c86743b7..938d2f31334 100644 --- a/src/op_analytics/datasources/velodrome/chain_list.py +++ b/src/op_analytics/datasources/velodrome/chain_list.py @@ -16,4 +16,7 @@ def chain_cls_to_str(chain_cls: type) -> str: # Map our chain names to the sugar sdk Chain class. -SUGAR_CHAINS = {"op": AsyncOPChain(), "base": AsyncBaseChain()} +SUGAR_CHAINS = { + "op": AsyncOPChain(), + "base": AsyncBaseChain(), +} diff --git a/src/op_analytics/datasources/velodrome/execute.py b/src/op_analytics/datasources/velodrome/execute.py index 56780af6879..0a6319386ac 100644 --- a/src/op_analytics/datasources/velodrome/execute.py +++ b/src/op_analytics/datasources/velodrome/execute.py @@ -6,6 +6,7 @@ from op_analytics.datasources.velodrome.pools import fetch_pools_for_chain from op_analytics.datasources.velodrome.prices import fetch_prices_for_chain from op_analytics.datasources.velodrome.tokens import fetch_tokens_for_chain +from .dataaccess import Velodrome log = structlog.get_logger() @@ -18,7 +19,7 @@ def _collect_data() -> dict[str, list[pl.DataFrame]]: all_data = {"tokens": [], "pools": [], "prices": []} for chain_name, chain_instance in SUGAR_CHAINS.items(): - log.info("Fetching Sugar data", chain=chain_name) + log.info("Fetching Velodrome data through sugar-sdk", chain=chain_name) chain_cls = type(chain_instance) chain_tokens_df = fetch_tokens_for_chain(chain_cls) @@ -34,20 +35,27 @@ def _collect_data() -> dict[str, list[pl.DataFrame]]: def execute_pull() -> dict[str, dict]: """ - Main entry point for Sugar ingestion logic. Fetches tokens, pools, and prices for each chain, - then writes them into Sugar partitions. + Main entry point for Velodrome data ingestion logic. Fetches tokens, pools, and prices for each chain, + + Returns: + dict: Summary of the data ingestion process. """ + log.info("Starting Velodrome data ingestion") + collected = _collect_data() summary = {} - Sugar.write_dataset(collected["tokens"], Sugar.TOKENS) + Velodrome.write_dataset(collected["tokens"], Velodrome.TOKENS) summary["tokens_df"] = dt_summary(collected["tokens"]) + log.info("Tokens data written successfully", count=len(collected["tokens"])) - Sugar.write_dataset(collected["pools"], Sugar.POOLS) + Velodrome.write_dataset(collected["pools"], Velodrome.POOLS) summary["pools_df"] = dt_summary(collected["pools"]) + log.info("Pools data written successfully", count=len(collected["pools"])) - Sugar.write_dataset(collected["prices"], Sugar.PRICES) + Velodrome.write_dataset(collected["prices"], Velodrome.PRICES) summary["prices_df"] = dt_summary(collected["prices"]) + log.info("Prices data written successfully", count=len(collected["prices"])) - log.info("Sugar ingestion completed", summary=summary) + log.info("Velodrome ingestion completed", summary=summary) return summary