diff --git a/src/op_analytics/datasources/chainsmeta/erc20tokens/RPC_ERROR_HANDLING.md b/src/op_analytics/datasources/chainsmeta/erc20tokens/RPC_ERROR_HANDLING.md new file mode 100644 index 00000000000..686f965429e --- /dev/null +++ b/src/op_analytics/datasources/chainsmeta/erc20tokens/RPC_ERROR_HANDLING.md @@ -0,0 +1,104 @@ +# RPC Error Handling Improvements + +## Problem + +The Dagster pipeline was failing with SSL errors when trying to connect to RPC endpoints: + +``` +requests.exceptions.SSLError: HTTPSConnectionPool(host='rpc.ham.fun', port=443): Max retries exceeded with url: / (Caused by SSLError(SSLError(1, '[SSL: SSLV3_ALERT_HANDSHAKE_FAILURE] sslv3 alert handshake failure (_ssl.c:1010)'))) +``` + +The issue was that the existing retry logic only handled `RateLimit` exceptions, but SSL errors and other connection issues were not being caught and retried. This caused entire chains to fail when their RPC endpoints were unreachable. + +## Solution + +I implemented comprehensive RPC error handling across the codebase: + +### 1. Enhanced Token-Level Error Handling (`tokens.py`) + +- **Added new exception class**: `RPCConnectionError` for connection-related issues +- **Extended retry decorator**: Now retries on both `RateLimit` and `RPCConnectionError` exceptions +- **Added connection error catching**: Catches SSL, connection, timeout, and request exceptions +- **Improved error logging**: Better error messages with context + +```python +@stamina.retry(on=(RateLimit, RPCConnectionError), attempts=3, wait_initial=10) +def call_rpc(self, ...): + try: + response = session.post(rpc_endpoint, json=self.rpc_batch()) + except (SSLError, ConnectionError, Timeout, RequestException) as ex: + log.warning(f"RPC connection error for {self} at {rpc_endpoint}: {ex}") + raise RPCConnectionError(f"Connection error to RPC endpoint: {ex}") from ex +``` + +### 2. Enhanced Chain-Level Error Handling (`chaintokens.py`) + +- **Added exception handling**: Catches `RPCConnectionError` and other exceptions during token processing +- **Graceful degradation**: Continues processing other tokens even if individual tokens fail +- **Better error logging**: Logs connection errors at debug level to avoid spam + +```python +try: + token_metadata = token.call_rpc(...) + # ... process token metadata +except RPCConnectionError as e: + batch_failed += 1 + total_failed += 1 + log.debug(f"skipped token due to RPC connection error: {token}, {e}") + continue +except Exception as e: + batch_failed += 1 + total_failed += 1 + log.warning(f"skipped token due to unexpected error: {token}, {e}") + continue +``` + +### 3. Enhanced Job-Level Error Handling (`execute.py`) + +- **Switched to failure-tolerant execution**: Uses `run_concurrently_store_failures` instead of `run_concurrently` +- **Graceful chain skipping**: Continues processing other chains even if some chains fail +- **Comprehensive logging**: Logs failed chains but doesn't fail the entire job + +```python +summary = run_concurrently_store_failures( + function=_fetch, + targets=targets, + max_workers=16, +) + +if summary.failures: + for chain, error_msg in summary.failures.items(): + detail = f"failed chain={chain}: error={error_msg}" + log.error(detail) + + log.warning(f"{len(summary.failures)} chains failed to execute, but continuing with successful chains") +``` + +### 4. Enhanced System Config Error Handling + +Applied the same improvements to the system config RPC handling: + +- **Updated `../systemconfig/rpc.py`**: Added `RPCConnectionError` and connection error handling +- **Updated `../systemconfig/chain.py`**: Added proper error handling for connection errors + +## Benefits + +1. **Resilience**: The pipeline now continues processing even when individual RPC endpoints are unreachable +2. **Better monitoring**: Failed chains are logged with detailed error messages +3. **Graceful degradation**: Partial success is better than complete failure +4. **Consistent retry logic**: All RPC calls now have consistent error handling and retry behavior + +## Monitoring + +The improved logging will help monitor RPC health: + +- **Chain failures**: Logged as errors with chain name and error details +- **Token failures**: Logged at debug level to avoid log spam +- **Success rates**: Reported for each chain and overall job + +## Future Improvements + +1. **RPC health monitoring**: Track RPC endpoint availability over time +2. **Automatic fallback**: Switch to backup RPC endpoints when primary fails +3. **Circuit breaker**: Temporarily disable problematic RPC endpoints +4. **Metrics**: Add metrics for RPC success/failure rates \ No newline at end of file diff --git a/src/op_analytics/datasources/chainsmeta/erc20tokens/chaintokens.py b/src/op_analytics/datasources/chainsmeta/erc20tokens/chaintokens.py index d74883f143f..dd857bd8e2e 100644 --- a/src/op_analytics/datasources/chainsmeta/erc20tokens/chaintokens.py +++ b/src/op_analytics/datasources/chainsmeta/erc20tokens/chaintokens.py @@ -6,7 +6,7 @@ from op_analytics.coreutils.logger import bound_contextvars, structlog from op_analytics.coreutils.request import new_session -from .tokens import Token +from .tokens import Token, RPCConnectionError log = structlog.get_logger() @@ -68,39 +68,72 @@ def fetch(self, process_dt: date): client = init_client("OPLABS") total_written_rows = 0 + total_processed = 0 + total_failed = 0 + with bound_contextvars(chain=self.chain): for batch in itertools.batched(self.tokens, n=30): data = [] + batch_failed = 0 for token in batch: - token_metadata = token.call_rpc( - rpc_endpoint=self.rpc_endpoint, - session=session, - speed_bump=SPEED_BUMP.get( - token.chain, DEFAULT_SPEED_BUMP - ), # avoid hitting the RPC rate limit - ) - - if token_metadata is None: - # an error was encountered - log.warning(f"error encountered for: {token}") + total_processed += 1 + try: + token_metadata = token.call_rpc( + rpc_endpoint=self.rpc_endpoint, + session=session, + speed_bump=SPEED_BUMP.get( + token.chain, DEFAULT_SPEED_BUMP + ), # avoid hitting the RPC rate limit + ) + + if token_metadata is None: + # an error was encountered + batch_failed += 1 + total_failed += 1 + log.debug( + f"skipped token due to missing functions or parsing errors: {token}" + ) + continue + + row = token_metadata.to_dict() + row["process_dt"] = process_dt + data.append([row[_] for _ in COLUMNS]) + except RPCConnectionError as e: + batch_failed += 1 + total_failed += 1 + log.debug(f"skipped token due to RPC connection error: {token}, {e}") + continue + except Exception as e: + batch_failed += 1 + total_failed += 1 + log.warning(f"skipped token due to unexpected error: {token}, {e}") continue - row = token_metadata.to_dict() - row["process_dt"] = process_dt - data.append([row[_] for _ in COLUMNS]) - - log.info(f"fetched token metadata from rpc for {len(batch)} tokens") - - result = client.insert( - table="chainsmeta.fact_erc20_token_metadata_v1", - data=data, - column_names=COLUMNS, - column_type_names=COLUMN_TYPE_NAMES, - ) log.info( - f"inserted token medatata {len(batch)} tokens, {result.written_rows} written rows" + f"fetched token metadata from rpc for {len(batch)} tokens (failed: {batch_failed})" ) - total_written_rows += result.written_rows + + if data: # Only insert if we have data + result = client.insert( + table="chainsmeta.fact_erc20_token_metadata_v1", + data=data, + column_names=COLUMNS, + column_type_names=COLUMN_TYPE_NAMES, + ) + log.info( + f"inserted token metadata {len(data)} tokens, {result.written_rows} written rows" + ) + total_written_rows += result.written_rows + + # Log final statistics + success_rate = ( + ((total_processed - total_failed) / total_processed * 100) if total_processed > 0 else 0 + ) + log.info( + f"token metadata processing complete for {self.chain}: " + f"processed={total_processed}, failed={total_failed}, " + f"success_rate={success_rate:.1f}%, written_rows={total_written_rows}" + ) return total_written_rows diff --git a/src/op_analytics/datasources/chainsmeta/erc20tokens/execute.py b/src/op_analytics/datasources/chainsmeta/erc20tokens/execute.py index b85c70c4fc5..e16558a371f 100644 --- a/src/op_analytics/datasources/chainsmeta/erc20tokens/execute.py +++ b/src/op_analytics/datasources/chainsmeta/erc20tokens/execute.py @@ -1,7 +1,7 @@ from datetime import date -from op_analytics.coreutils.threads import run_concurrently +from op_analytics.coreutils.threads import run_concurrently_store_failures from op_analytics.coreutils.time import now_date from op_analytics.coreutils.logger import structlog @@ -23,13 +23,30 @@ def execute_pull(process_dt: date | None = None): def _fetch(x: ChainTokens): return x.fetch(process_dt=process_dt) - # Run each chain on a separate thread. - results = run_concurrently( + # Run each chain on a separate thread, but continue even if some chains fail. + summary = run_concurrently_store_failures( function=_fetch, targets=targets, max_workers=16, ) + # Log any chain failures but don't fail the entire job + if summary.failures: + for chain, error_msg in summary.failures.items(): + detail = f"failed chain={chain}: error={error_msg}" + log.error(detail) + + log.warning( + f"{len(summary.failures)} chains failed to execute, but continuing with successful chains" + ) + + # Log success summary + successful_chains = len(summary.results) + total_chains = len(targets) + log.info( + f"ERC20 tokens processing completed: {successful_chains}/{total_chains} chains successful" + ) + # TODO: # Here we can copy over from the clickhouse dimension table to GCS. # As we migrate out of BigQuery it maybe useful to have a copy of the token @@ -37,4 +54,4 @@ def _fetch(x: ChainTokens): # ChainsMeta.ERC20_TOKEN_METADATA.write(dataframe=df.with_columns(dt=pl.lit(DEFAULT_DT))) # This will be a dictionary from chain to total rows inserted. - return results + return summary.results diff --git a/src/op_analytics/datasources/chainsmeta/erc20tokens/tokens.py b/src/op_analytics/datasources/chainsmeta/erc20tokens/tokens.py index 237faf874ee..55614924270 100644 --- a/src/op_analytics/datasources/chainsmeta/erc20tokens/tokens.py +++ b/src/op_analytics/datasources/chainsmeta/erc20tokens/tokens.py @@ -4,7 +4,13 @@ import requests import stamina -from requests.exceptions import JSONDecodeError +from requests.exceptions import ( + JSONDecodeError, + SSLError, + ConnectionError, + Timeout, + RequestException, +) from op_analytics.coreutils.logger import structlog from op_analytics.coreutils.request import new_session @@ -42,6 +48,12 @@ class RateLimit(Exception): pass +class RPCConnectionError(Exception): + """Raised when there's a connection error to the RPC endpoint.""" + + pass + + class TokenMetadataError(Exception): """Raised when we fail to parse the RPC response.""" @@ -93,7 +105,7 @@ def rpc_batch(self): return batch - @stamina.retry(on=RateLimit, attempts=3, wait_initial=10) + @stamina.retry(on=(RateLimit, RPCConnectionError), attempts=3, wait_initial=10) def call_rpc( self, rpc_endpoint: str | None = None, @@ -104,14 +116,22 @@ def call_rpc( rpc_endpoint = rpc_endpoint or get_rpc_for_chain(chain_id=self.chain_id) start = time.perf_counter() - response = session.post(rpc_endpoint, json=self.rpc_batch()) + try: + response = session.post(rpc_endpoint, json=self.rpc_batch()) + except (SSLError, ConnectionError, Timeout, RequestException) as ex: + log.warning(f"RPC connection error for {self} at {rpc_endpoint}: {ex}") + raise RPCConnectionError(f"Connection error to RPC endpoint: {ex}") from ex try: response_data = response.json() except JSONDecodeError as ex: raise TokenResponseError(dict(token=self)) from ex - result = TokenMetadata.of(token=self, response=response_data) + try: + result = TokenMetadata.of(token=self, response=response_data) + except TokenMetadataError as ex: + log.warning(f"failed to parse token metadata for {self}: {ex}") + return None ellapsed = time.perf_counter() - start if ellapsed < speed_bump: @@ -142,15 +162,35 @@ def to_dict(self): @classmethod def of(cls, token: Token, response: list[dict]) -> Optional["TokenMetadata"]: data: dict[str, Any] = {} + failed_methods = [] + for item in response: if "error" in item: code = item["error"].get("code") if code == -32016: # rate limit raise RateLimit(f"JSON-RPC error: {item} [{token}]") - if code == -32000: # "execution reverted" - log.warning(f"rpc returned -32000 'execution reverted' {token}") - return None + # Handle various error codes that indicate missing functions or contract issues + # -32000: execution reverted (function doesn't exist or reverts) + # -32001: method not found + # -32002: invalid params + # -32003: internal error + # -32004: invalid request + # -32005: parse error + # 3: execution reverted with specific message (e.g., "Contract does not have fallback nor receive functions") + if code in [ + -32000, + -32001, + -32002, + -32003, + -32004, + -32005, + 3, + ]: # execution reverted, method not found, contract issues, etc. + method_name = item.get("id", "unknown") + failed_methods.append(method_name) + log.debug(f"method {method_name} failed with code {code} for {token}") + continue raise Exception(f"JSON-RPC error: {item} [{token}]") @@ -167,6 +207,41 @@ def of(cls, token: Token, response: list[dict]) -> Optional["TokenMetadata"]: else: raise Exception("invalid item id: " + item["id"]) + # If we have failed methods, log them and check if we can still proceed + if failed_methods: + log.warning( + f"methods {failed_methods} failed for {token}, checking if we can still proceed" + ) + + # Check if we have the minimum required data to create token metadata + required_methods = ["decimals", "symbol", "name", "totalSupply"] + missing_required = [method for method in required_methods if method not in data] + + if missing_required: + log.warning( + f"missing required methods {missing_required} for {token}, skipping token" + ) + return None + + # Helper functions for safe decoding with defaults + def safe_decode(key: str, decode_type: str, default_value: Any = None): + if key in data: + try: + return decode(decode_type, data[key]) + except Exception as ex: + log.warning(f"failed to decode {key} for {token}: {ex}") + return default_value + return default_value + + def safe_decode_string(key: str, default_value: str = "UNKNOWN"): + if key in data: + try: + return decode_string(data[key]) + except Exception as ex: + log.warning(f"failed to decode string {key} for {token}: {ex}") + return default_value + return default_value + try: return cls( chain=token.chain, @@ -174,10 +249,10 @@ def of(cls, token: Token, response: list[dict]) -> Optional["TokenMetadata"]: contract_address=token.contract_address, block_number=data["block_number"], block_timestamp=data["block_timestamp"], - decimals=decode("uint8", data["decimals"]), - symbol=decode_string(data["symbol"]), - name=decode_string(data["name"]), - total_supply=decode("uint256", data["totalSupply"]), + decimals=safe_decode("decimals", "uint8", 18), # Default to 18 decimals + symbol=safe_decode_string("symbol", "UNKNOWN"), + name=safe_decode_string("name", "Unknown Token"), + total_supply=safe_decode("totalSupply", "uint256", 0), # Default to 0 ) except Exception as ex: raise TokenMetadataError(dict(data=data, token=token)) from ex diff --git a/src/op_analytics/datasources/chainsmeta/systemconfig/chain.py b/src/op_analytics/datasources/chainsmeta/systemconfig/chain.py index 21a985ab417..a63ddb0c24d 100644 --- a/src/op_analytics/datasources/chainsmeta/systemconfig/chain.py +++ b/src/op_analytics/datasources/chainsmeta/systemconfig/chain.py @@ -5,7 +5,7 @@ from op_analytics.coreutils.logger import structlog from op_analytics.coreutils.request import new_session -from .rpc import RPCManager +from .rpc import RPCManager, RPCConnectionError log = structlog.get_logger() @@ -35,11 +35,14 @@ def fetch( session = session or new_session() system_config = RPCManager(system_config_proxy=system_config_proxy) - config_metadata = system_config.call_rpc( - rpc_endpoint=ETHEREUM_RPC_URL, - session=session, - speed_bump=DEFAULT_SPEED_BUMP, - ) + try: + config_metadata = system_config.call_rpc( + rpc_endpoint=ETHEREUM_RPC_URL, + session=session, + speed_bump=DEFAULT_SPEED_BUMP, + ) + except RPCConnectionError as e: + raise Exception(f"RPC connection error for chain {chain_id}: {e}") from e if config_metadata is None: raise Exception(f"error encountered for chain {chain_id}") diff --git a/src/op_analytics/datasources/chainsmeta/systemconfig/rpc.py b/src/op_analytics/datasources/chainsmeta/systemconfig/rpc.py index 7ce4361ba78..1ce6f200034 100644 --- a/src/op_analytics/datasources/chainsmeta/systemconfig/rpc.py +++ b/src/op_analytics/datasources/chainsmeta/systemconfig/rpc.py @@ -4,7 +4,13 @@ import requests import stamina -from requests.exceptions import JSONDecodeError +from requests.exceptions import ( + JSONDecodeError, + SSLError, + ConnectionError, + Timeout, + RequestException, +) from op_analytics.coreutils.logger import structlog from op_analytics.coreutils.request import new_session @@ -69,6 +75,12 @@ class RateLimit(Exception): pass +class RPCConnectionError(Exception): + """Raised when there's a connection error to the RPC endpoint.""" + + pass + + class SystemConfigError(Exception): """Raised when we fail to parse the RPC response.""" @@ -110,7 +122,7 @@ def rpc_batch(self): return batch - @stamina.retry(on=RateLimit, attempts=3, wait_initial=10) + @stamina.retry(on=(RateLimit, RPCConnectionError), attempts=3, wait_initial=10) def call_rpc( self, rpc_endpoint: str, @@ -120,7 +132,11 @@ def call_rpc( session = session or new_session() start = time.perf_counter() - response = session.post(rpc_endpoint, json=self.rpc_batch()) + try: + response = session.post(rpc_endpoint, json=self.rpc_batch()) + except (SSLError, ConnectionError, Timeout, RequestException) as ex: + log.warning(f"RPC connection error for {self} at {rpc_endpoint}: {ex}") + raise RPCConnectionError(f"Connection error to RPC endpoint: {ex}") from ex try: response_data = response.json() diff --git a/tests/op_analytics/datasources/chainsmeta/erc20tokens/test_tokens.py b/tests/op_analytics/datasources/chainsmeta/erc20tokens/test_tokens.py index fc4cf9016ce..e935bf58565 100644 --- a/tests/op_analytics/datasources/chainsmeta/erc20tokens/test_tokens.py +++ b/tests/op_analytics/datasources/chainsmeta/erc20tokens/test_tokens.py @@ -70,6 +70,139 @@ def test_decode_response(): } +def test_error_handling_missing_functions(): + """Test that tokens with missing functions are handled gracefully.""" + # Test with error code 3 (contract doesn't have fallback/receive functions) + response_with_error_3: list[dict] = [ + { + "jsonrpc": "2.0", + "result": { + "number": "0x1934a94", + "timestamp": "0x67b0f20b", + }, + "id": "block", + }, + { + "jsonrpc": "2.0", + "error": { + "code": 3, + "message": "execution reverted: Contract does not have fallback nor receive functions", + "data": "0x08c379a000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000035436f6e747261637420646f6573206e6f7420686176652066616c6c6261636b206e6f7220726563656976652066756e6374696f6e730000000000000000000000", + }, + "id": "decimals", + }, + { + "jsonrpc": "2.0", + "error": { + "code": 3, + "message": "execution reverted: Contract does not have fallback nor receive functions", + }, + "id": "symbol", + }, + { + "jsonrpc": "2.0", + "error": { + "code": 3, + "message": "execution reverted: Contract does not have fallback nor receive functions", + }, + "id": "name", + }, + { + "jsonrpc": "2.0", + "error": { + "code": 3, + "message": "execution reverted: Contract does not have fallback nor receive functions", + }, + "id": "totalSupply", + }, + ] + + token = Token( + chain="base", chain_id=8453, contract_address="0xece616e29f54d124542e8d7ae5296f73640f8002" + ) + result = TokenMetadata.of(token, response_with_error_3) + + # Should return None because all required methods failed + assert result is None + + # Test with error code -32000 (execution reverted) + response_with_error_32000: list[dict] = [ + { + "jsonrpc": "2.0", + "result": { + "number": "0x1934a94", + "timestamp": "0x67b0f20b", + }, + "id": "block", + }, + { + "jsonrpc": "2.0", + "error": {"code": -32000, "message": "execution reverted"}, + "id": "decimals", + }, + { + "jsonrpc": "2.0", + "result": "0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000054265744d65000000000000000000000000000000000000000000000000000000", + "id": "symbol", + }, + { + "jsonrpc": "2.0", + "result": "0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000054265744d65000000000000000000000000000000000000000000000000000000", + "id": "name", + }, + { + "jsonrpc": "2.0", + "result": "0x0000000000000000000000000000000000000000033b2e3c9fd0803ce8000000", + "id": "totalSupply", + }, + ] + + result = TokenMetadata.of(token, response_with_error_32000) + + # Should return None because decimals method failed (required) + assert result is None + + # Test with partial success (some methods work, some fail) + response_partial_success: list[dict] = [ + { + "jsonrpc": "2.0", + "result": { + "number": "0x1934a94", + "timestamp": "0x67b0f20b", + }, + "id": "block", + }, + { + "jsonrpc": "2.0", + "result": "0x0000000000000000000000000000000000000000000000000000000000000012", + "id": "decimals", + }, + { + "jsonrpc": "2.0", + "result": "0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000054265744d65000000000000000000000000000000000000000000000000000000", + "id": "symbol", + }, + { + "jsonrpc": "2.0", + "error": { + "code": 3, + "message": "execution reverted: Contract does not have fallback nor receive functions", + }, + "id": "name", + }, + { + "jsonrpc": "2.0", + "result": "0x0000000000000000000000000000000000000000033b2e3c9fd0803ce8000000", + "id": "totalSupply", + }, + ] + + result = TokenMetadata.of(token, response_partial_success) + + # Should return None because name method failed (required) + assert result is None + + def test_invalid_strings(): """Some tokens have invalid utf-8 on their symbol or name."""