Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ dev-dependencies = [
]

[tool.uv.sources]
sugar = { git = "https://github.com/velodrome-finance/sugar-sdk" }
sugar = { git = "https://github.com/velodrome-finance/sugar-sdk", branch = "main" }

[tool.ruff]
# Allow lines to be as long as 100.
Expand Down
19 changes: 19 additions & 0 deletions src/op_analytics/dagster/assets/velodrome.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from op_analytics.coreutils.partitioned.dailydata import DailyDataset


class Velodrome(DailyDataset):
"""
The Sugar dataset tracks tokens, pools, and prices from the Velodrome sugar-sdk.
See also:
- https://github.com/velodrome-finance/sugar
- https://github.com/velodrome-finance/sugar-sdk

Tables:
- tokens_v1
- pools_v1
- prices_v1
"""

TOKENS = "tokens_v1"
POOLS = "pools_v1"
PRICES = "prices_v1"
22 changes: 22 additions & 0 deletions src/op_analytics/datasources/velodrome/chain_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from sugar.chains import AsyncOPChain, AsyncBaseChain


def chain_cls_to_str(chain_cls: type) -> str:
"""
Convert a chain class like OPChain → 'op' or BaseChain → 'base'.
Extend this if you support more chains in future.
"""
name = chain_cls.__name__.lower()
if "opchain" in name:
return "op"
elif "basechain" in name:
return "base"
# Fallback or raise an error for unhandled cases
raise ValueError(f"Unrecognized chain class: {chain_cls.__name__}")


# Map our chain names to the sugar sdk Chain class.
SUGAR_CHAINS = {
"op": AsyncOPChain(),
"base": AsyncBaseChain(),
}
7 changes: 7 additions & 0 deletions src/op_analytics/datasources/velodrome/dataaccess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from op_analytics.coreutils.partitioned.dailydata import DailyDataset


class Velodrome(DailyDataset):
TOKENS = "tokens_v1"
POOLS = "liquidity_pools_v1"
PRICES = "prices_v1"
61 changes: 61 additions & 0 deletions src/op_analytics/datasources/velodrome/execute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import polars as pl

from op_analytics.coreutils.logger import structlog
from op_analytics.coreutils.partitioned.dailydatautils import dt_summary
from op_analytics.datasources.velodrome.chain_list import SUGAR_CHAINS
from op_analytics.datasources.velodrome.pools import fetch_pools_for_chain
from op_analytics.datasources.velodrome.prices import fetch_prices_for_chain
from op_analytics.datasources.velodrome.tokens import fetch_tokens_for_chain
from .dataaccess import Velodrome

log = structlog.get_logger()


def _collect_data() -> dict[str, list[pl.DataFrame]]:
"""
Gather tokens, pools, and prices DataFrames for each chain in SUGAR_CHAINS.
Returns a dict mapping "tokens", "pools", and "prices" to lists of DataFrames.
"""
all_data = {"tokens": [], "pools": [], "prices": []}

for chain_name, chain_instance in SUGAR_CHAINS.items():
log.info("Fetching Velodrome data through sugar-sdk", chain=chain_name)

chain_cls = type(chain_instance)
chain_tokens_df = fetch_tokens_for_chain(chain_cls)
chain_pools_df = fetch_pools_for_chain(chain_cls)
chain_prices_df = fetch_prices_for_chain(chain_cls)

all_data["tokens"].append(chain_tokens_df)
all_data["pools"].append(chain_pools_df)
all_data["prices"].append(chain_prices_df)

return all_data


def execute_pull() -> dict[str, dict]:
"""
Main entry point for Velodrome data ingestion logic. Fetches tokens, pools, and prices for each chain,

Returns:
dict: Summary of the data ingestion process.
"""
log.info("Starting Velodrome data ingestion")

collected = _collect_data()
summary = {}

Velodrome.write_dataset(collected["tokens"], Velodrome.TOKENS)
summary["tokens_df"] = dt_summary(collected["tokens"])
log.info("Tokens data written successfully", count=len(collected["tokens"]))

Velodrome.write_dataset(collected["pools"], Velodrome.POOLS)
summary["pools_df"] = dt_summary(collected["pools"])
log.info("Pools data written successfully", count=len(collected["pools"]))

Velodrome.write_dataset(collected["prices"], Velodrome.PRICES)
summary["prices_df"] = dt_summary(collected["prices"])
log.info("Prices data written successfully", count=len(collected["prices"]))

log.info("Velodrome ingestion completed", summary=summary)
return summary
66 changes: 66 additions & 0 deletions src/op_analytics/datasources/velodrome/pools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from op_analytics.coreutils.logger import structlog
import polars as pl
from sugar.pool import LiquidityPool

from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools
from op_analytics.datasources.velodrome.chain_list import chain_cls_to_str

log = structlog.get_logger()

POOLS_SCHEMA = {
"chain": pl.Utf8,
"lp": pl.Utf8,
"factory": pl.Utf8,
"symbol": pl.Utf8,
"is_stable": pl.Boolean,
"total_supply": pl.Float64,
"decimals": pl.Int64,
"token0": pl.Utf8,
"token1": pl.Utf8,
"pool_fee": pl.Float64,
"gauge_total_supply": pl.Float64,
"emissions_token": pl.Utf8,
"nfpm": pl.Utf8,
"alm": pl.Utf8,
}


def fetch_pools_for_chain(chain_cls: type) -> pl.DataFrame:
"""
Fetch pool data from sugarwrapper.py. We then map the returned LiquidityPool objects
into a Polars DataFrame defined by POOLS_SCHEMA.
"""
chain_str = chain_cls_to_str(chain_cls)
log.info(f"Fetching pools for {chain_str}")

velodrome_pools = fetch_pools(chain_str)
raw_pools = velodrome_pools.pools # list[LiquidityPool]

pool_records = []
for lp in raw_pools:
if not isinstance(lp, LiquidityPool):
continue
pool_records.append(
{
"chain": chain_str,
"lp": lp.lp,
"symbol": lp.symbol,
"factory": lp.factory,
"is_stable": lp.is_stable,
"total_supply": float(lp.total_supply),
"decimals": int(lp.decimals),
"token0": lp.token0.token_address if lp.token0 else "unknown",
"token1": lp.token1.token_address if lp.token1 else "unknown",
"pool_fee": float(lp.pool_fee),
"gauge_total_supply": float(lp.gauge_total_supply),
"emissions_token": lp.emissions_token.token_address
if lp.emissions_token
else "unknown",
"nfpm": lp.nfpm,
"alm": lp.alm,
}
)

df = pl.DataFrame(pool_records, schema=POOLS_SCHEMA)
log.info(f"{chain_str} returned {df.height} pools.")
return df
43 changes: 43 additions & 0 deletions src/op_analytics/datasources/velodrome/prices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from op_analytics.coreutils.logger import structlog
import polars as pl

from sugar.price import Price

from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools
from op_analytics.datasources.velodrome.chain_list import chain_cls_to_str

log = structlog.get_logger()

PRICES_SCHEMA = {
"chain": pl.Utf8,
"token_address": pl.Utf8,
"price": pl.Float64,
}


def fetch_prices_for_chain(chain_cls: type) -> pl.DataFrame:
"""
Fetch token prices for each chain by leveraging sugarwrapper.py's fetch_pools().
We then build a DataFrame from the returned Price objects.
"""
chain_str = chain_cls_to_str(chain_cls)
log.info(f"Fetching prices for {chain_str} via sugarwrapper fetch_pools()")

velodrome_pools = fetch_pools(chain_str)
prices = velodrome_pools.prices # list[Price]

price_records = []
for cp in prices:
if not isinstance(cp, Price):
continue
price_records.append(
{
"chain": chain_str,
"token_address": cp.token.token_address,
"price": cp.price,
}
)

df = pl.DataFrame(price_records, schema=PRICES_SCHEMA)
log.info(f"{chain_str} returned {df.height} prices.")
return df
13 changes: 3 additions & 10 deletions src/op_analytics/datasources/velodrome/sugarwrapper.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
import itertools
from dataclasses import dataclass

from sugar.chains import BaseChain, OPChain, Chain
from sugar.chains import Chain
from sugar.price import Price
from sugar.token import Token
from sugar.pool import LiquidityPool
from sugar.helpers import normalize_address

from op_analytics.coreutils.coroutines import run_coroutine
from op_analytics.coreutils.logger import structlog

from op_analytics.datasources.velodrome.chain_list import SUGAR_CHAINS

log = structlog.get_logger()


# Map our chain names to the sugar sdk Chain class.
SUGAR_CHAINS = {
"op": OPChain(),
"base": BaseChain(),
}


@dataclass(frozen=True)
class MissingTokenInfo:
"""A LiquidityPool that could not be instantiated due to missing token information."""
Expand Down Expand Up @@ -120,7 +113,7 @@ async def _sugar_pools(chain: str, sugar_chain: Chain) -> VelodromePools:
lps = []
missing = []
for pool in pools:
lp = LiquidityPool.from_tuple(pool, tokens_index)
lp = LiquidityPool.from_tuple(pool, tokens_index, prices)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lithium323

We have a bunch of missing Token information for native tokens mostly. Not sure how to fix this yet.

Sample data:

Exception: possible error, token info not found for pools
[MissingTokenInfo(token0='0x4200000000000000000000000000000000000006', token1='0x557929D1d015F4B30E9C0E68cDF6e8a86B308E18', pool_type=-1),
 MissingTokenInfo(token0='0x2C00f7fF36c3591340b7DB47c22C4a413b925E5F', token1='0x4200000000000000000000000000000000000006', pool_type=-1),
 MissingTokenInfo(token0='0x4200000000000000000000000000000000000006', token1='0xF6A35217F35220efF1e3E1c489C30817245bc7cf', pool_type=-1),
 MissingTokenInfo(token0='0x4200000000000000000000000000000000000006', token1='0x92A062bB26A0Cbc704BF7dcD4C833d4e1beeb83d', pool_type=-1),
 MissingTokenInfo(token0='0x505269DA224Db7C3497A9799Fee06BbfbD0D40D2', token1='0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913', pool_type=-1),
 MissingTokenInfo(token0='0x06ef35365c3FA9125E94b4657E671FDaA0cB7B70', token1='0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913', pool_type=-1),

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I brought this up with the velodrome team on telegram. At first setting listed_only=False made all pools with missing tokens go away. We should check again with them if that isn't working anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried with both "listed_only" options, and this issue persists. I think this requires patching on the sugar-sdk side.


if lp is None:
missing.append(
Expand Down
44 changes: 44 additions & 0 deletions src/op_analytics/datasources/velodrome/tokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from op_analytics.coreutils.logger import structlog
import polars as pl

from op_analytics.datasources.velodrome.sugarwrapper import fetch_pools
from op_analytics.datasources.velodrome.chain_list import chain_cls_to_str

log = structlog.get_logger()

TOKEN_SCHEMA = {
"chain": pl.Utf8,
"token_address": pl.Utf8,
"symbol": pl.Utf8,
"decimals": pl.Int64,
"listed": pl.Boolean,
}


def fetch_tokens_for_chain(chain_cls: type) -> pl.DataFrame:
"""
Fetch token metadata for a single chain by leveraging sugarwrapper.py's fetch_pools().
Returns a Polars DataFrame matching TOKEN_SCHEMA.
"""
chain_str = chain_cls_to_str(chain_cls)
log.info(f"Fetching tokens for {chain_str} via sugarwrapper fetch_pools()")

# fetch_pools() returns a VelodromePools dataclass; we only need tokens here
velodrome_pools = fetch_pools(chain_str)
tokens = velodrome_pools.tokens # a list of sugar.token.Token objects

token_records = []
for t in tokens:
token_records.append(
{
"chain": chain_str,
"token_address": t.token_address,
"symbol": t.symbol,
"decimals": t.decimals,
"listed": t.listed, # if the Token object has a 'listed' attribute
}
)

df = pl.DataFrame(token_records, schema=TOKEN_SCHEMA)
log.info(f"{chain_str} returned {df.height} tokens.")
return df