Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# RPC Error Handling Improvements

## Problem

The Dagster pipeline was failing with SSL errors when trying to connect to RPC endpoints:

```
requests.exceptions.SSLError: HTTPSConnectionPool(host='rpc.ham.fun', port=443): Max retries exceeded with url: / (Caused by SSLError(SSLError(1, '[SSL: SSLV3_ALERT_HANDSHAKE_FAILURE] sslv3 alert handshake failure (_ssl.c:1010)')))
```

The issue was that the existing retry logic only handled `RateLimit` exceptions, but SSL errors and other connection issues were not being caught and retried. This caused entire chains to fail when their RPC endpoints were unreachable.

## Solution

I implemented comprehensive RPC error handling across the codebase:

### 1. Enhanced Token-Level Error Handling (`tokens.py`)

- **Added new exception class**: `RPCConnectionError` for connection-related issues
- **Extended retry decorator**: Now retries on both `RateLimit` and `RPCConnectionError` exceptions
- **Added connection error catching**: Catches SSL, connection, timeout, and request exceptions
- **Improved error logging**: Better error messages with context

```python
@stamina.retry(on=(RateLimit, RPCConnectionError), attempts=3, wait_initial=10)
def call_rpc(self, ...):
try:
response = session.post(rpc_endpoint, json=self.rpc_batch())
except (SSLError, ConnectionError, Timeout, RequestException) as ex:
log.warning(f"RPC connection error for {self} at {rpc_endpoint}: {ex}")
raise RPCConnectionError(f"Connection error to RPC endpoint: {ex}") from ex
```

### 2. Enhanced Chain-Level Error Handling (`chaintokens.py`)

- **Added exception handling**: Catches `RPCConnectionError` and other exceptions during token processing
- **Graceful degradation**: Continues processing other tokens even if individual tokens fail
- **Better error logging**: Logs connection errors at debug level to avoid spam

```python
try:
token_metadata = token.call_rpc(...)
# ... process token metadata
except RPCConnectionError as e:
batch_failed += 1
total_failed += 1
log.debug(f"skipped token due to RPC connection error: {token}, {e}")
continue
except Exception as e:
batch_failed += 1
total_failed += 1
log.warning(f"skipped token due to unexpected error: {token}, {e}")
continue
```

### 3. Enhanced Job-Level Error Handling (`execute.py`)

- **Switched to failure-tolerant execution**: Uses `run_concurrently_store_failures` instead of `run_concurrently`
- **Graceful chain skipping**: Continues processing other chains even if some chains fail
- **Comprehensive logging**: Logs failed chains but doesn't fail the entire job

```python
summary = run_concurrently_store_failures(
function=_fetch,
targets=targets,
max_workers=16,
)

if summary.failures:
for chain, error_msg in summary.failures.items():
detail = f"failed chain={chain}: error={error_msg}"
log.error(detail)

log.warning(f"{len(summary.failures)} chains failed to execute, but continuing with successful chains")
```

### 4. Enhanced System Config Error Handling

Applied the same improvements to the system config RPC handling:

- **Updated `../systemconfig/rpc.py`**: Added `RPCConnectionError` and connection error handling
- **Updated `../systemconfig/chain.py`**: Added proper error handling for connection errors

## Benefits

1. **Resilience**: The pipeline now continues processing even when individual RPC endpoints are unreachable
2. **Better monitoring**: Failed chains are logged with detailed error messages
3. **Graceful degradation**: Partial success is better than complete failure
4. **Consistent retry logic**: All RPC calls now have consistent error handling and retry behavior

## Monitoring

The improved logging will help monitor RPC health:

- **Chain failures**: Logged as errors with chain name and error details
- **Token failures**: Logged at debug level to avoid log spam
- **Success rates**: Reported for each chain and overall job

## Future Improvements

1. **RPC health monitoring**: Track RPC endpoint availability over time
2. **Automatic fallback**: Switch to backup RPC endpoints when primary fails
3. **Circuit breaker**: Temporarily disable problematic RPC endpoints
4. **Metrics**: Add metrics for RPC success/failure rates
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from op_analytics.coreutils.logger import bound_contextvars, structlog
from op_analytics.coreutils.request import new_session

from .tokens import Token
from .tokens import Token, RPCConnectionError

log = structlog.get_logger()

Expand Down Expand Up @@ -68,39 +68,72 @@ def fetch(self, process_dt: date):
client = init_client("OPLABS")

total_written_rows = 0
total_processed = 0
total_failed = 0

with bound_contextvars(chain=self.chain):
for batch in itertools.batched(self.tokens, n=30):
data = []
batch_failed = 0

for token in batch:
token_metadata = token.call_rpc(
rpc_endpoint=self.rpc_endpoint,
session=session,
speed_bump=SPEED_BUMP.get(
token.chain, DEFAULT_SPEED_BUMP
), # avoid hitting the RPC rate limit
)

if token_metadata is None:
# an error was encountered
log.warning(f"error encountered for: {token}")
total_processed += 1
try:
token_metadata = token.call_rpc(
rpc_endpoint=self.rpc_endpoint,
session=session,
speed_bump=SPEED_BUMP.get(
token.chain, DEFAULT_SPEED_BUMP
), # avoid hitting the RPC rate limit
)

if token_metadata is None:
# an error was encountered
batch_failed += 1
total_failed += 1
log.debug(
f"skipped token due to missing functions or parsing errors: {token}"
)
continue

row = token_metadata.to_dict()
row["process_dt"] = process_dt
data.append([row[_] for _ in COLUMNS])
except RPCConnectionError as e:
batch_failed += 1
total_failed += 1
log.debug(f"skipped token due to RPC connection error: {token}, {e}")
continue
except Exception as e:
batch_failed += 1
total_failed += 1
log.warning(f"skipped token due to unexpected error: {token}, {e}")
continue

row = token_metadata.to_dict()
row["process_dt"] = process_dt
data.append([row[_] for _ in COLUMNS])

log.info(f"fetched token metadata from rpc for {len(batch)} tokens")

result = client.insert(
table="chainsmeta.fact_erc20_token_metadata_v1",
data=data,
column_names=COLUMNS,
column_type_names=COLUMN_TYPE_NAMES,
)
log.info(
f"inserted token medatata {len(batch)} tokens, {result.written_rows} written rows"
f"fetched token metadata from rpc for {len(batch)} tokens (failed: {batch_failed})"
)
total_written_rows += result.written_rows

if data: # Only insert if we have data
result = client.insert(
table="chainsmeta.fact_erc20_token_metadata_v1",
data=data,
column_names=COLUMNS,
column_type_names=COLUMN_TYPE_NAMES,
)
log.info(
f"inserted token metadata {len(data)} tokens, {result.written_rows} written rows"
)
total_written_rows += result.written_rows

# Log final statistics
success_rate = (
((total_processed - total_failed) / total_processed * 100) if total_processed > 0 else 0
)
log.info(
f"token metadata processing complete for {self.chain}: "
f"processed={total_processed}, failed={total_failed}, "
f"success_rate={success_rate:.1f}%, written_rows={total_written_rows}"
)

return total_written_rows
25 changes: 21 additions & 4 deletions src/op_analytics/datasources/chainsmeta/erc20tokens/execute.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import date


from op_analytics.coreutils.threads import run_concurrently
from op_analytics.coreutils.threads import run_concurrently_store_failures
from op_analytics.coreutils.time import now_date
from op_analytics.coreutils.logger import structlog

Expand All @@ -23,18 +23,35 @@ def execute_pull(process_dt: date | None = None):
def _fetch(x: ChainTokens):
return x.fetch(process_dt=process_dt)

# Run each chain on a separate thread.
results = run_concurrently(
# Run each chain on a separate thread, but continue even if some chains fail.
summary = run_concurrently_store_failures(
function=_fetch,
targets=targets,
max_workers=16,
)

# Log any chain failures but don't fail the entire job
if summary.failures:
for chain, error_msg in summary.failures.items():
detail = f"failed chain={chain}: error={error_msg}"
log.error(detail)

log.warning(
f"{len(summary.failures)} chains failed to execute, but continuing with successful chains"
)

# Log success summary
successful_chains = len(summary.results)
total_chains = len(targets)
log.info(
f"ERC20 tokens processing completed: {successful_chains}/{total_chains} chains successful"
)

# TODO:
# Here we can copy over from the clickhouse dimension table to GCS.
# As we migrate out of BigQuery it maybe useful to have a copy of the token
# metdata that we can read from BigQuery.
# ChainsMeta.ERC20_TOKEN_METADATA.write(dataframe=df.with_columns(dt=pl.lit(DEFAULT_DT)))

# This will be a dictionary from chain to total rows inserted.
return results
return summary.results
Loading