11use crate :: {
22 core:: {
33 entities:: { edges:: edge_ref:: EdgeRef , LayerIds , VID } ,
4+ utils:: iter:: GenLockedIter ,
45 Direction ,
56 } ,
67 db:: api:: {
@@ -21,7 +22,10 @@ use polars_arrow::datatypes::ArrowDataType;
2122use pometry_storage:: {
2223 graph:: TemporalGraph , timestamps:: LayerAdditions , tprops:: DiskTProp , GidRef ,
2324} ;
24- use raphtory_api:: core:: storage:: timeindex:: { TimeIndexEntry , TimeIndexIntoOps } ;
25+ use raphtory_api:: {
26+ core:: storage:: timeindex:: { TimeIndexEntry , TimeIndexIntoOps } ,
27+ iter:: IntoDynBoxed ,
28+ } ;
2529use std:: { borrow:: Cow , iter, ops:: Range , sync:: Arc } ;
2630
2731#[ derive( Copy , Clone , Debug ) ]
@@ -366,87 +370,104 @@ impl DiskOwnedNode {
366370 match layers {
367371 LayerIds :: None => LayerVariants :: None ( iter:: empty ( ) ) ,
368372 LayerIds :: All => {
369- let layers = self . graph . arc_layers ( ) . clone ( ) ;
370- LayerVariants :: All (
373+ let iter = GenLockedIter :: from ( self , |owned_node| {
374+ let layers = owned_node . graph . arc_layers ( ) ;
371375 ( 0 ..layers. len ( ) )
372376 . map ( move |layer_id| {
373377 let layer = & layers[ layer_id] ;
374- let eids = layer. nodes_storage ( ) . into_out_edges_iter ( self . vid ) ;
375- let nbrs = layer. nodes_storage ( ) . into_out_neighbours_iter ( self . vid ) ;
376- eids. zip ( nbrs)
377- . map ( move |( eid, dst) | EdgeRef :: new_outgoing ( eid, self . vid , dst) )
378+ let eids = layer. nodes_storage ( ) . out_edges_iter ( owned_node. vid ) ;
379+ let nbrs = layer. nodes_storage ( ) . out_neighbours_iter ( owned_node. vid ) ;
380+ eids. zip ( nbrs) . map ( move |( eid, dst) | {
381+ EdgeRef :: new_outgoing ( eid, owned_node. vid , dst)
382+ } )
378383 } )
379384 . kmerge_by ( |e1, e2| e1. remote ( ) <= e2. remote ( ) )
380- . dedup_by ( |e1, e2| e1. remote ( ) == e2. remote ( ) ) ,
381- )
385+ . dedup_by ( |e1, e2| e1. remote ( ) == e2. remote ( ) )
386+ . into_dyn_boxed ( )
387+ } ) ;
388+ LayerVariants :: All ( iter)
382389 }
383390 LayerIds :: One ( layer_id) => {
384- let layer = self . graph . layer ( layer_id ) ;
385- let eids = layer . nodes_storage ( ) . into_out_edges_iter ( self . vid ) ;
386- let nbrs = layer. nodes_storage ( ) . into_out_neighbours_iter ( self . vid ) ;
387- LayerVariants :: One (
391+ let iter = GenLockedIter :: from ( self , |owned_node| {
392+ let layer = owned_node . graph . layer ( layer_id ) ;
393+ let eids = layer. nodes_storage ( ) . out_edges_iter ( owned_node . vid ) ;
394+ let nbrs = layer . nodes_storage ( ) . out_neighbours_iter ( owned_node . vid ) ;
388395 eids. zip ( nbrs)
389- . map ( move |( eid, dst) | EdgeRef :: new_outgoing ( eid, self . vid , dst) ) ,
390- )
396+ . map ( move |( eid, dst) | EdgeRef :: new_outgoing ( eid, owned_node. vid , dst) )
397+ . into_dyn_boxed ( )
398+ } ) ;
399+ LayerVariants :: One ( iter)
400+ }
401+ LayerIds :: Multiple ( ids) => {
402+ LayerVariants :: Multiple ( GenLockedIter :: from ( self , |owned_node| {
403+ ids. into_iter ( )
404+ . map ( move |layer_id| {
405+ let layer = owned_node. graph . layer ( layer_id) ;
406+ let eids = layer. nodes_storage ( ) . out_edges_iter ( owned_node. vid ) ;
407+ let nbrs = layer. nodes_storage ( ) . out_neighbours_iter ( owned_node. vid ) ;
408+ let src = owned_node. vid ;
409+ eids. zip ( nbrs)
410+ . map ( move |( eid, dst) | EdgeRef :: new_outgoing ( eid, src, dst) )
411+ } )
412+ . kmerge_by ( |e1, e2| e1. remote ( ) <= e2. remote ( ) )
413+ . dedup_by ( |e1, e2| e1. remote ( ) == e2. remote ( ) )
414+ . into_dyn_boxed ( )
415+ } ) )
391416 }
392- LayerIds :: Multiple ( ids) => LayerVariants :: Multiple (
393- ids. into_iter ( )
394- . map ( move |layer_id| {
395- let layer = self . graph . layer ( layer_id) ;
396- let eids = layer. nodes_storage ( ) . into_out_edges_iter ( self . vid ) ;
397- let nbrs = layer. nodes_storage ( ) . into_out_neighbours_iter ( self . vid ) ;
398- let src = self . vid ;
399- eids. zip ( nbrs)
400- . map ( move |( eid, dst) | EdgeRef :: new_outgoing ( eid, src, dst) )
401- } )
402- . kmerge_by ( |e1, e2| e1. remote ( ) <= e2. remote ( ) )
403- . dedup_by ( |e1, e2| e1. remote ( ) == e2. remote ( ) ) ,
404- ) ,
405417 }
406418 }
407419
408420 pub fn in_edges ( self , layers : LayerIds ) -> impl Iterator < Item = EdgeRef > {
409421 match layers {
410422 LayerIds :: None => LayerVariants :: None ( iter:: empty ( ) ) ,
411423 LayerIds :: All => {
412- let layers = self . graph . arc_layers ( ) . clone ( ) ;
413- LayerVariants :: All (
424+ let iter = GenLockedIter :: from ( self , |owned_node| {
425+ let layers = owned_node . graph . arc_layers ( ) ;
414426 ( 0 ..layers. len ( ) )
415427 . map ( move |layer_id| {
416428 let layer = & layers[ layer_id] ;
417- let eids = layer. nodes_storage ( ) . into_in_edges_iter ( self . vid ) ;
418- let nbrs = layer. nodes_storage ( ) . into_in_neighbours_iter ( self . vid ) ;
419- let dst = self . vid ;
429+ let eids = layer. nodes_storage ( ) . in_edges_iter ( owned_node . vid ) ;
430+ let nbrs = layer. nodes_storage ( ) . in_neighbours_iter ( owned_node . vid ) ;
431+ let dst = owned_node . vid ;
420432 eids. zip ( nbrs)
421433 . map ( move |( eid, src) | EdgeRef :: new_incoming ( eid, src, dst) )
422434 } )
423435 . kmerge_by ( |e1, e2| e1. remote ( ) <= e2. remote ( ) )
424- . dedup_by ( |e1, e2| e1. remote ( ) == e2. remote ( ) ) ,
425- )
436+ . dedup_by ( |e1, e2| e1. remote ( ) == e2. remote ( ) )
437+ . into_dyn_boxed ( )
438+ } ) ;
439+ LayerVariants :: All ( iter)
426440 }
427441 LayerIds :: One ( layer_id) => {
428- let layer = self . graph . layer ( layer_id ) ;
429- let eids = layer . nodes_storage ( ) . into_in_edges_iter ( self . vid ) ;
430- let nbrs = layer. nodes_storage ( ) . into_in_neighbours_iter ( self . vid ) ;
431- let dst = self . vid ;
432- LayerVariants :: One (
442+ let iter = GenLockedIter :: from ( self , |owned_node| {
443+ let layer = owned_node . graph . layer ( layer_id ) ;
444+ let eids = layer. nodes_storage ( ) . in_edges_iter ( owned_node . vid ) ;
445+ let nbrs = layer . nodes_storage ( ) . in_neighbours_iter ( owned_node . vid ) ;
446+ let dst = owned_node . vid ;
433447 eids. zip ( nbrs)
434- . map ( move |( eid, src) | EdgeRef :: new_incoming ( eid, src, dst) ) ,
435- )
448+ . map ( move |( eid, src) | EdgeRef :: new_incoming ( eid, src, dst) )
449+ . into_dyn_boxed ( )
450+ } ) ;
451+ LayerVariants :: One ( iter)
452+ }
453+ LayerIds :: Multiple ( ids) => {
454+ let iter = GenLockedIter :: from ( self , |owned_node| {
455+ ids. into_iter ( )
456+ . map ( move |layer_id| {
457+ let layer = owned_node. graph . layer ( layer_id) ;
458+ let eids = layer. nodes_storage ( ) . in_edges_iter ( owned_node. vid ) ;
459+ let nbrs = layer. nodes_storage ( ) . in_neighbours_iter ( owned_node. vid ) ;
460+ let dst = owned_node. vid ;
461+ eids. zip ( nbrs)
462+ . map ( move |( eid, src) | EdgeRef :: new_incoming ( eid, src, dst) )
463+ } )
464+ . kmerge_by ( |e1, e2| e1. remote ( ) <= e2. remote ( ) )
465+ . dedup_by ( |e1, e2| e1. remote ( ) == e2. remote ( ) )
466+ . into_dyn_boxed ( )
467+ } ) ;
468+
469+ LayerVariants :: Multiple ( iter)
436470 }
437- LayerIds :: Multiple ( ids) => LayerVariants :: Multiple (
438- ids. into_iter ( )
439- . map ( move |layer_id| {
440- let layer = self . graph . layer ( layer_id) ;
441- let eids = layer. nodes_storage ( ) . into_in_edges_iter ( self . vid ) ;
442- let nbrs = layer. nodes_storage ( ) . into_in_neighbours_iter ( self . vid ) ;
443- let dst = self . vid ;
444- eids. zip ( nbrs)
445- . map ( move |( eid, src) | EdgeRef :: new_incoming ( eid, src, dst) )
446- } )
447- . kmerge_by ( |e1, e2| e1. remote ( ) <= e2. remote ( ) )
448- . dedup_by ( |e1, e2| e1. remote ( ) == e2. remote ( ) ) ,
449- ) ,
450471 }
451472 }
452473
0 commit comments