@@ -7,19 +7,17 @@ use fnv::FnvHashMap;
7
7
use lighthouse_network:: PeerId ;
8
8
use lighthouse_network:: service:: api_types:: { CustodyId , DataColumnsByRootRequester } ;
9
9
use lighthouse_tracing:: SPAN_OUTGOING_CUSTODY_REQUEST ;
10
- use lru_cache:: LRUTimeCache ;
11
10
use parking_lot:: RwLock ;
12
- use rand:: Rng ;
13
11
use std:: collections:: HashSet ;
12
+ use std:: hash:: { BuildHasher , RandomState } ;
14
13
use std:: time:: { Duration , Instant } ;
15
14
use std:: { collections:: HashMap , marker:: PhantomData , sync:: Arc } ;
16
- use tracing:: { Span , debug, debug_span, field , warn} ;
15
+ use tracing:: { Span , debug, debug_span, warn} ;
17
16
use types:: { DataColumnSidecar , Hash256 , data_column_sidecar:: ColumnIndex } ;
18
17
use types:: { DataColumnSidecarList , EthSpec } ;
19
18
20
19
use super :: { LookupRequestResult , PeerGroup , RpcResponseResult , SyncNetworkContext } ;
21
20
22
- const FAILED_PEERS_CACHE_EXPIRY_SECONDS : u64 = 5 ;
23
21
const MAX_STALE_NO_PEERS_DURATION : Duration = Duration :: from_secs ( 30 ) ;
24
22
25
23
pub struct ActiveCustodyRequest < T : BeaconChainTypes > {
@@ -30,9 +28,7 @@ pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
30
28
/// Active requests for 1 or more columns each
31
29
active_batch_columns_requests :
32
30
FnvHashMap < DataColumnsByRootRequestId , ActiveBatchColumnsRequest > ,
33
- /// Peers that have recently failed to successfully respond to a columns by root request.
34
- /// Having a LRUTimeCache allows this request to not have to track disconnecting peers.
35
- failed_peers : LRUTimeCache < PeerId > ,
31
+ peer_attempts : HashMap < PeerId , usize > ,
36
32
/// Set of peers that claim to have imported this block and their custody columns
37
33
lookup_peers : Arc < RwLock < HashSet < PeerId > > > ,
38
34
/// Span for tracing the lifetime of this request.
@@ -71,7 +67,11 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
71
67
column_indices : & [ ColumnIndex ] ,
72
68
lookup_peers : Arc < RwLock < HashSet < PeerId > > > ,
73
69
) -> Self {
74
- let span = debug_span ! ( parent: None , SPAN_OUTGOING_CUSTODY_REQUEST , %block_root) ;
70
+ let span = debug_span ! (
71
+ parent: Span :: current( ) ,
72
+ SPAN_OUTGOING_CUSTODY_REQUEST ,
73
+ %block_root,
74
+ ) ;
75
75
Self {
76
76
block_root,
77
77
custody_id,
@@ -81,7 +81,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
81
81
. map ( |index| ( * index, ColumnRequest :: new ( ) ) ) ,
82
82
) ,
83
83
active_batch_columns_requests : <_ >:: default ( ) ,
84
- failed_peers : LRUTimeCache :: new ( Duration :: from_secs ( FAILED_PEERS_CACHE_EXPIRY_SECONDS ) ) ,
84
+ peer_attempts : HashMap :: new ( ) ,
85
85
lookup_peers,
86
86
span,
87
87
_phantom : PhantomData ,
@@ -170,13 +170,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
170
170
?missing_column_indexes,
171
171
"Custody column peer claims to not have some data"
172
172
) ;
173
-
174
- batch_request. span . record (
175
- "missing_column_indexes" ,
176
- field:: debug ( missing_column_indexes) ,
177
- ) ;
178
-
179
- self . failed_peers . insert ( peer_id) ;
180
173
}
181
174
}
182
175
Err ( err) => {
@@ -195,13 +188,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
195
188
. ok_or ( Error :: BadState ( "unknown column_index" . to_owned ( ) ) ) ?
196
189
. on_download_error_and_mark_failure ( req_id) ?;
197
190
}
198
-
199
- batch_request. span . record (
200
- "missing_column_indexes" ,
201
- field:: debug ( & batch_request. indices ) ,
202
- ) ;
203
-
204
- self . failed_peers . insert ( peer_id) ;
205
191
}
206
192
} ;
207
193
@@ -238,52 +224,29 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
238
224
let active_request_count_by_peer = cx. active_request_count_by_peer ( ) ;
239
225
let mut columns_to_request_by_peer = HashMap :: < PeerId , Vec < ColumnIndex > > :: new ( ) ;
240
226
let lookup_peers = self . lookup_peers . read ( ) ;
227
+ // Create deterministic hasher per request to ensure consistent peer ordering within
228
+ // this request (avoiding fragmentation) while varying selection across different requests
229
+ let random_state = RandomState :: new ( ) ;
241
230
242
- // Need to:
243
- // - track how many active requests a peer has for load balancing
244
- // - which peers have failures to attempt others
245
- // - which peer returned what to have PeerGroup attributability
246
-
247
- for ( column_index, request) in self . column_requests . iter_mut ( ) {
231
+ for ( column_index, request) in self . column_requests . iter ( ) {
248
232
if let Some ( wait_duration) = request. is_awaiting_download ( ) {
233
+ // Note: an empty response is considered a successful response, so we may end up
234
+ // retrying many more times than `MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS`.
249
235
if request. download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS {
250
236
return Err ( Error :: TooManyFailures ) ;
251
237
}
252
238
253
- // TODO(das): When is a fork and only a subset of your peers know about a block, we should
254
- // only query the peers on that fork. Should this case be handled? How to handle it?
255
- let custodial_peers = cx. get_custodial_peers ( * column_index) ;
239
+ let peer_to_request = self . select_column_peer (
240
+ cx,
241
+ & active_request_count_by_peer,
242
+ & lookup_peers,
243
+ * column_index,
244
+ & random_state,
245
+ ) ;
256
246
257
- // We draw from the total set of peers, but prioritize those peers who we have
258
- // received an attestation / status / block message claiming to have imported the
259
- // lookup. The frequency of those messages is low, so drawing only from lookup_peers
260
- // could cause many lookups to take much longer or fail as they don't have enough
261
- // custody peers on a given column
262
- let mut priorized_peers = custodial_peers
263
- . iter ( )
264
- . map ( |peer| {
265
- (
266
- // Prioritize peers that claim to know have imported this block
267
- if lookup_peers. contains ( peer) { 0 } else { 1 } ,
268
- // De-prioritize peers that have failed to successfully respond to
269
- // requests recently
270
- self . failed_peers . contains ( peer) ,
271
- // Prefer peers with fewer requests to load balance across peers.
272
- // We batch requests to the same peer, so count existence in the
273
- // `columns_to_request_by_peer` as a single 1 request.
274
- active_request_count_by_peer. get ( peer) . copied ( ) . unwrap_or ( 0 )
275
- + columns_to_request_by_peer. get ( peer) . map ( |_| 1 ) . unwrap_or ( 0 ) ,
276
- // Random factor to break ties, otherwise the PeerID breaks ties
277
- rand:: rng ( ) . random :: < u32 > ( ) ,
278
- * peer,
279
- )
280
- } )
281
- . collect :: < Vec < _ > > ( ) ;
282
- priorized_peers. sort_unstable ( ) ;
283
-
284
- if let Some ( ( _, _, _, _, peer_id) ) = priorized_peers. first ( ) {
247
+ if let Some ( peer_id) = peer_to_request {
285
248
columns_to_request_by_peer
286
- . entry ( * peer_id)
249
+ . entry ( peer_id)
287
250
. or_default ( )
288
251
. push ( * column_index) ;
289
252
} else if wait_duration > MAX_STALE_NO_PEERS_DURATION {
@@ -298,6 +261,23 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
298
261
}
299
262
}
300
263
264
+ let peer_requests = columns_to_request_by_peer. len ( ) ;
265
+ if peer_requests > 0 {
266
+ let columns_requested_count = columns_to_request_by_peer
267
+ . values ( )
268
+ . map ( |v| v. len ( ) )
269
+ . sum :: < usize > ( ) ;
270
+ debug ! (
271
+ lookup_peers = lookup_peers. len( ) ,
272
+ "Requesting {} columns from {} peers" , columns_requested_count, peer_requests,
273
+ ) ;
274
+ } else {
275
+ debug ! (
276
+ lookup_peers = lookup_peers. len( ) ,
277
+ "No column peers found for look up" ,
278
+ ) ;
279
+ }
280
+
301
281
for ( peer_id, indices) in columns_to_request_by_peer. into_iter ( ) {
302
282
let request_result = cx
303
283
. data_column_lookup_request (
@@ -317,8 +297,14 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
317
297
318
298
match request_result {
319
299
LookupRequestResult :: RequestSent ( req_id) => {
300
+ * self . peer_attempts . entry ( peer_id) . or_insert ( 0 ) += 1 ;
301
+
320
302
let client = cx. network_globals ( ) . client ( & peer_id) . kind ;
321
- let batch_columns_req_span = debug_span ! ( "batch_columns_req" , %peer_id, %client, missing_column_indexes = tracing:: field:: Empty ) ;
303
+ let batch_columns_req_span = debug_span ! (
304
+ "batch_columns_req" ,
305
+ %peer_id,
306
+ %client,
307
+ ) ;
322
308
let _guard = batch_columns_req_span. clone ( ) . entered ( ) ;
323
309
for column_index in & indices {
324
310
let column_request = self
@@ -345,11 +331,54 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
345
331
346
332
Ok ( None )
347
333
}
334
+
335
+ fn select_column_peer (
336
+ & self ,
337
+ cx : & mut SyncNetworkContext < T > ,
338
+ active_request_count_by_peer : & HashMap < PeerId , usize > ,
339
+ lookup_peers : & HashSet < PeerId > ,
340
+ column_index : ColumnIndex ,
341
+ random_state : & RandomState ,
342
+ ) -> Option < PeerId > {
343
+ // We draw from the total set of peers, but prioritize those peers who we have
344
+ // received an attestation or a block from (`lookup_peers`), as the `lookup_peers` may take
345
+ // time to build up and we are likely to not find any column peers initially.
346
+ let custodial_peers = cx. get_custodial_peers ( column_index) ;
347
+ let mut prioritized_peers = custodial_peers
348
+ . iter ( )
349
+ . filter ( |peer| {
350
+ // Exclude peers that we have already made too many attempts to.
351
+ self . peer_attempts . get ( peer) . copied ( ) . unwrap_or ( 0 ) <= MAX_CUSTODY_PEER_ATTEMPTS
352
+ } )
353
+ . map ( |peer| {
354
+ (
355
+ // Prioritize peers that claim to know have imported this block
356
+ if lookup_peers. contains ( peer) { 0 } else { 1 } ,
357
+ // De-prioritize peers that we have already attempted to download from
358
+ self . peer_attempts . get ( peer) . copied ( ) . unwrap_or ( 0 ) ,
359
+ // Prefer peers with fewer requests to load balance across peers.
360
+ active_request_count_by_peer. get ( peer) . copied ( ) . unwrap_or ( 0 ) ,
361
+ // The hash ensures consistent peer ordering within this request
362
+ // to avoid fragmentation while varying selection across different requests.
363
+ random_state. hash_one ( peer) ,
364
+ * peer,
365
+ )
366
+ } )
367
+ . collect :: < Vec < _ > > ( ) ;
368
+ prioritized_peers. sort_unstable ( ) ;
369
+
370
+ prioritized_peers
371
+ . first ( )
372
+ . map ( |( _, _, _, _, peer_id) | * peer_id)
373
+ }
348
374
}
349
375
350
376
/// TODO(das): this attempt count is nested into the existing lookup request count.
351
377
const MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS : usize = 3 ;
352
378
379
+ /// Max number of attempts to request custody columns from a single peer.
380
+ const MAX_CUSTODY_PEER_ATTEMPTS : usize = 3 ;
381
+
353
382
struct ColumnRequest < E : EthSpec > {
354
383
status : Status < E > ,
355
384
download_failures : usize ,
0 commit comments