diff --git a/pyproject.toml b/pyproject.toml index 16e17cb9799..0bf502e7eb3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,7 +70,7 @@ dev-dependencies = [ ] [tool.uv.sources] -sugar = { git = "https://github.com/velodrome-finance/sugar-sdk" } +sugar = { git = "https://github.com/velodrome-finance/sugar-sdk", branch = "main" } [tool.ruff] # Allow lines to be as long as 100. diff --git a/src/op_analytics/dagster/assets/velodrome.py b/src/op_analytics/dagster/assets/velodrome.py new file mode 100644 index 00000000000..825a3115a5f --- /dev/null +++ b/src/op_analytics/dagster/assets/velodrome.py @@ -0,0 +1,19 @@ +from op_analytics.coreutils.partitioned.dailydata import DailyDataset + + +class Velodrome(DailyDataset): + """ + The Sugar dataset tracks tokens, pools, and prices from the Velodrome sugar-sdk. + See also: + - https://github.com/velodrome-finance/sugar + - https://github.com/velodrome-finance/sugar-sdk + + Tables: + - tokens_v1 + - pools_v1 + - prices_v1 + """ + + TOKENS = "tokens_v1" + POOLS = "pools_v1" + PRICES = "prices_v1" diff --git a/src/op_analytics/datasources/velodrome/chain_list.py b/src/op_analytics/datasources/velodrome/chain_list.py new file mode 100644 index 00000000000..938d2f31334 --- /dev/null +++ b/src/op_analytics/datasources/velodrome/chain_list.py @@ -0,0 +1,22 @@ +from sugar.chains import AsyncOPChain, AsyncBaseChain + + +def chain_cls_to_str(chain_cls: type) -> str: + """ + Convert a chain class like OPChain → 'op' or BaseChain → 'base'. + Extend this if you support more chains in future. + """ + name = chain_cls.__name__.lower() + if "opchain" in name: + return "op" + elif "basechain" in name: + return "base" + # Fallback or raise an error for unhandled cases + raise ValueError(f"Unrecognized chain class: {chain_cls.__name__}") + + +# Map our chain names to the sugar sdk Chain class. +SUGAR_CHAINS = { + "op": AsyncOPChain(), + "base": AsyncBaseChain(), +} diff --git a/src/op_analytics/datasources/velodrome/dataaccess.py b/src/op_analytics/datasources/velodrome/dataaccess.py new file mode 100644 index 00000000000..4f8071ad2d5 --- /dev/null +++ b/src/op_analytics/datasources/velodrome/dataaccess.py @@ -0,0 +1,7 @@ +from op_analytics.coreutils.partitioned.dailydata import DailyDataset + + +class Velodrome(DailyDataset): + TOKENS = "tokens_v1" + POOLS = "liquidity_pools_v1" + PRICES = "prices_v1" diff --git a/src/op_analytics/datasources/velodrome/execute.py b/src/op_analytics/datasources/velodrome/execute.py new file mode 100644 index 00000000000..0a6319386ac --- /dev/null +++ b/src/op_analytics/datasources/velodrome/execute.py @@ -0,0 +1,61 @@ +import polars as pl + +from op_analytics.coreutils.logger import structlog +from op_analytics.coreutils.partitioned.dailydatautils import dt_summary +from op_analytics.datasources.velodrome.chain_list import SUGAR_CHAINS +from op_analytics.datasources.velodrome.pools import fetch_pools_for_chain +from op_analytics.datasources.velodrome.prices import fetch_prices_for_chain +from op_analytics.datasources.velodrome.tokens import fetch_tokens_for_chain +from .dataaccess import Velodrome + +log = structlog.get_logger() + + +def _collect_data() -> dict[str, list[pl.DataFrame]]: + """ + Gather tokens, pools, and prices DataFrames for each chain in SUGAR_CHAINS. + Returns a dict mapping "tokens", "pools", and "prices" to lists of DataFrames. + """ + all_data = {"tokens": [], "pools": [], "prices": []} + + for chain_name, chain_instance in SUGAR_CHAINS.items(): + log.info("Fetching Velodrome data through sugar-sdk", chain=chain_name) + + chain_cls = type(chain_instance) + chain_tokens_df = fetch_tokens_for_chain(chain_cls) + chain_pools_df = fetch_pools_for_chain(chain_cls) + chain_prices_df = fetch_prices_for_chain(chain_cls) + + all_data["tokens"].append(chain_tokens_df) + all_data["pools"].append(chain_pools_df) + all_data["prices"].append(chain_prices_df) + + return all_data + + +def execute_pull() -> dict[str, dict]: + """ + Main entry point for Velodrome data ingestion logic. Fetches tokens, pools, and prices for each chain, + + Returns: + dict: Summary of the data ingestion process. + """ + log.info("Starting Velodrome data ingestion") + + collected = _collect_data() + summary = {} + + Velodrome.write_dataset(collected["tokens"], Velodrome.TOKENS) + summary["tokens_df"] = dt_summary(collected["tokens"]) + log.info("Tokens data written successfully", count=len(collected["tokens"])) + + Velodrome.write_dataset(collected["pools"], Velodrome.POOLS) + summary["pools_df"] = dt_summary(collected["pools"]) + log.info("Pools data written successfully", count=len(collected["pools"])) + + Velodrome.write_dataset(collected["prices"], Velodrome.PRICES) + summary["prices_df"] = dt_summary(collected["prices"]) + log.info("Prices data written successfully", count=len(collected["prices"])) + + log.info("Velodrome ingestion completed", summary=summary) + return summary diff --git a/src/op_analytics/datasources/velodrome/pools.py b/src/op_analytics/datasources/velodrome/pools.py new file mode 100644 index 00000000000..1034f3239c1 --- /dev/null +++ b/src/op_analytics/datasources/velodrome/pools.py @@ -0,0 +1,66 @@ +from op_analytics.coreutils.logger import structlog +import polars as pl +from sugar.pool import LiquidityPool + +from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools +from op_analytics.datasources.velodrome.chain_list import chain_cls_to_str + +log = structlog.get_logger() + +POOLS_SCHEMA = { + "chain": pl.Utf8, + "lp": pl.Utf8, + "factory": pl.Utf8, + "symbol": pl.Utf8, + "is_stable": pl.Boolean, + "total_supply": pl.Float64, + "decimals": pl.Int64, + "token0": pl.Utf8, + "token1": pl.Utf8, + "pool_fee": pl.Float64, + "gauge_total_supply": pl.Float64, + "emissions_token": pl.Utf8, + "nfpm": pl.Utf8, + "alm": pl.Utf8, +} + + +def fetch_pools_for_chain(chain_cls: type) -> pl.DataFrame: + """ + Fetch pool data from sugarwrapper.py. We then map the returned LiquidityPool objects + into a Polars DataFrame defined by POOLS_SCHEMA. + """ + chain_str = chain_cls_to_str(chain_cls) + log.info(f"Fetching pools for {chain_str}") + + velodrome_pools = fetch_pools(chain_str) + raw_pools = velodrome_pools.pools # list[LiquidityPool] + + pool_records = [] + for lp in raw_pools: + if not isinstance(lp, LiquidityPool): + continue + pool_records.append( + { + "chain": chain_str, + "lp": lp.lp, + "symbol": lp.symbol, + "factory": lp.factory, + "is_stable": lp.is_stable, + "total_supply": float(lp.total_supply), + "decimals": int(lp.decimals), + "token0": lp.token0.token_address if lp.token0 else "unknown", + "token1": lp.token1.token_address if lp.token1 else "unknown", + "pool_fee": float(lp.pool_fee), + "gauge_total_supply": float(lp.gauge_total_supply), + "emissions_token": lp.emissions_token.token_address + if lp.emissions_token + else "unknown", + "nfpm": lp.nfpm, + "alm": lp.alm, + } + ) + + df = pl.DataFrame(pool_records, schema=POOLS_SCHEMA) + log.info(f"{chain_str} returned {df.height} pools.") + return df diff --git a/src/op_analytics/datasources/velodrome/prices.py b/src/op_analytics/datasources/velodrome/prices.py new file mode 100644 index 00000000000..d72a1ebcf7b --- /dev/null +++ b/src/op_analytics/datasources/velodrome/prices.py @@ -0,0 +1,43 @@ +from op_analytics.coreutils.logger import structlog +import polars as pl + +from sugar.price import Price + +from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools +from op_analytics.datasources.velodrome.chain_list import chain_cls_to_str + +log = structlog.get_logger() + +PRICES_SCHEMA = { + "chain": pl.Utf8, + "token_address": pl.Utf8, + "price": pl.Float64, +} + + +def fetch_prices_for_chain(chain_cls: type) -> pl.DataFrame: + """ + Fetch token prices for each chain by leveraging sugarwrapper.py's fetch_pools(). + We then build a DataFrame from the returned Price objects. + """ + chain_str = chain_cls_to_str(chain_cls) + log.info(f"Fetching prices for {chain_str} via sugarwrapper fetch_pools()") + + velodrome_pools = fetch_pools(chain_str) + prices = velodrome_pools.prices # list[Price] + + price_records = [] + for cp in prices: + if not isinstance(cp, Price): + continue + price_records.append( + { + "chain": chain_str, + "token_address": cp.token.token_address, + "price": cp.price, + } + ) + + df = pl.DataFrame(price_records, schema=PRICES_SCHEMA) + log.info(f"{chain_str} returned {df.height} prices.") + return df diff --git a/src/op_analytics/datasources/velodrome/sugarwrapper.py b/src/op_analytics/datasources/velodrome/sugarwrapper.py index c469cf8941f..2adebb24964 100644 --- a/src/op_analytics/datasources/velodrome/sugarwrapper.py +++ b/src/op_analytics/datasources/velodrome/sugarwrapper.py @@ -1,7 +1,7 @@ import itertools from dataclasses import dataclass -from sugar.chains import BaseChain, OPChain, Chain +from sugar.chains import Chain from sugar.price import Price from sugar.token import Token from sugar.pool import LiquidityPool @@ -9,18 +9,11 @@ from op_analytics.coreutils.coroutines import run_coroutine from op_analytics.coreutils.logger import structlog - +from op_analytics.datasources.velodrome.chain_list import SUGAR_CHAINS log = structlog.get_logger() -# Map our chain names to the sugar sdk Chain class. -SUGAR_CHAINS = { - "op": OPChain(), - "base": BaseChain(), -} - - @dataclass(frozen=True) class MissingTokenInfo: """A LiquidityPool that could not be instantiated due to missing token information.""" @@ -120,7 +113,7 @@ async def _sugar_pools(chain: str, sugar_chain: Chain) -> VelodromePools: lps = [] missing = [] for pool in pools: - lp = LiquidityPool.from_tuple(pool, tokens_index) + lp = LiquidityPool.from_tuple(pool, tokens_index, prices) if lp is None: missing.append( diff --git a/src/op_analytics/datasources/velodrome/tokens.py b/src/op_analytics/datasources/velodrome/tokens.py new file mode 100644 index 00000000000..3a835c0bb14 --- /dev/null +++ b/src/op_analytics/datasources/velodrome/tokens.py @@ -0,0 +1,44 @@ +from op_analytics.coreutils.logger import structlog +import polars as pl + +from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools +from op_analytics.datasources.velodrome.chain_list import chain_cls_to_str + +log = structlog.get_logger() + +TOKEN_SCHEMA = { + "chain": pl.Utf8, + "token_address": pl.Utf8, + "symbol": pl.Utf8, + "decimals": pl.Int64, + "listed": pl.Boolean, +} + + +def fetch_tokens_for_chain(chain_cls: type) -> pl.DataFrame: + """ + Fetch token metadata for a single chain by leveraging sugarwrapper.py's fetch_pools(). + Returns a Polars DataFrame matching TOKEN_SCHEMA. + """ + chain_str = chain_cls_to_str(chain_cls) + log.info(f"Fetching tokens for {chain_str} via sugarwrapper fetch_pools()") + + # fetch_pools() returns a VelodromePools dataclass; we only need tokens here + velodrome_pools = fetch_pools(chain_str) + tokens = velodrome_pools.tokens # a list of sugar.token.Token objects + + token_records = [] + for t in tokens: + token_records.append( + { + "chain": chain_str, + "token_address": t.token_address, + "symbol": t.symbol, + "decimals": t.decimals, + "listed": t.listed, # if the Token object has a 'listed' attribute + } + ) + + df = pl.DataFrame(token_records, schema=TOKEN_SCHEMA) + log.info(f"{chain_str} returned {df.height} tokens.") + return df