@@ -370,30 +370,29 @@ public CompletableFuture<Boolean> run() {
370370 }
371371 runStatus .setStage (StageEnum .DATABASES , CollectionEnum .COMPLETED );
372372
373- // Collect Table Information and ensure process is complete before moving on.
374- while (true ) {
375- boolean check = true ;
376- for (CompletableFuture <ReturnStatus > sf : gtf ) {
377- if (!sf .isDone ()) {
378- check = false ;
379- break ;
380- }
381- try {
382- if (sf .isDone () && sf .get () != null ) {
383- if (sf .get ().getStatus () == ReturnStatus .Status .ERROR ) {
384- rtn = Boolean .FALSE ;
385- // throw new RuntimeException(sf.get().getException());
386- }
373+ // Wait for all the CompletableFutures to finish.
374+ CompletableFuture .allOf (gtf .toArray (new CompletableFuture [0 ])).join ();
375+
376+ // Check that all the CompletableFutures in 'gtf' passed with ReturnStatus.Status.SUCCESS.
377+ for (CompletableFuture <ReturnStatus > sf : gtf ) {
378+ try {
379+ ReturnStatus rs = sf .get ();
380+ if (nonNull (rs )) {
381+ if (rs .getStatus () == ReturnStatus .Status .SUCCESS ) {
382+ runStatus .getOperationStatistics ().getSuccesses ().incrementDatabases ();
383+ } else {
384+ rtn = Boolean .FALSE ;
385+ runStatus .getOperationStatistics ().getFailures ().incrementDatabases ();
387386 }
388- } catch (InterruptedException | ExecutionException e ) {
389- log .error ("Interrupted Table collection" , e );
390- rtn = Boolean .FALSE ;
391- // throw new RuntimeException(e);
387+ } else {
388+ log .error ("ReturnStatus is null in gathering table." );
392389 }
390+ } catch (InterruptedException | ExecutionException e ) {
391+ log .error ("Interrupted Table collection" , e );
392+ rtn = Boolean .FALSE ;
393393 }
394- if (check )
395- break ;
396394 }
395+
397396 runStatus .setStage (StageEnum .TABLES , CollectionEnum .COMPLETED );
398397 gtf .clear (); // reset
399398
@@ -497,58 +496,46 @@ public CompletableFuture<Boolean> run() {
497496 // Check that a tables metadata has been retrieved. When it has (ReturnStatus.Status.CALCULATED_SQL),
498497 // move on to the NEXTSTEP and actual do the transfer.
499498 // ========================================
500- while (true ) {
501- boolean check = true ;
502- for (CompletableFuture <ReturnStatus > sf : gtf ) {
503- if (!sf .isDone ()) {
504- check = false ;
505- break ;
506- }
507- try {
508- if (sf .isDone () && sf .get () != null ) {
509- switch (sf .get ().getStatus ()) {
510- case SUCCESS :
511- runStatus .getOperationStatistics ().getCounts ().incrementTables ();
512- // Trigger next step and set status.
513- // TODO: Next Step
514- sf .get ().setStatus (ReturnStatus .Status .NEXTSTEP );
515- // Launch the next step, which is the transfer.
516- runStatus .getOperationStatistics ().getSuccesses ().incrementTables ();
517-
518- migrationFuture .add (getTransferService ().build (sf .get ().getTableMirror ()));
519- break ;
520- case ERROR :
521- runStatus .getOperationStatistics ().getCounts ().incrementTables ();
522- sf .get ().setStatus (ReturnStatus .Status .NEXTSTEP );
523- break ;
524- case FATAL :
525- runStatus .getOperationStatistics ().getCounts ().incrementTables ();
526- runStatus .getOperationStatistics ().getFailures ().incrementTables ();
527- rtn = Boolean .FALSE ;
528- sf .get ().setStatus (ReturnStatus .Status .NEXTSTEP );
529- log .error ("FATAL: " , sf .get ().getException ());
530- case NEXTSTEP :
531- break ;
532- case SKIP :
533- runStatus .getOperationStatistics ().getCounts ().incrementTables ();
534- // Set for tables that are being removed.
535- runStatus .getOperationStatistics ().getSkipped ().incrementTables ();
536- sf .get ().setStatus (ReturnStatus .Status .NEXTSTEP );
537- break ;
538- }
499+ CompletableFuture .allOf (gtf .toArray (new CompletableFuture [0 ])).join ();
500+ // Check that all the CompletableFutures in 'gtf' passed with ReturnStatus.Status.SUCCESS.
501+ for (CompletableFuture <ReturnStatus > sf : gtf ) {
502+ try {
503+ ReturnStatus returnStatus = sf .get ();
504+ if (nonNull (returnStatus )) {
505+ switch (returnStatus .getStatus ()) {
506+ case SUCCESS :
507+ runStatus .getOperationStatistics ().getCounts ().incrementTables ();
508+ // Trigger next step and set status.
509+ // TODO: Next Step
510+ sf .get ().setStatus (ReturnStatus .Status .NEXTSTEP );
511+ // Launch the next step, which is the transfer.
512+ runStatus .getOperationStatistics ().getSuccesses ().incrementTables ();
513+
514+ migrationFuture .add (getTransferService ().build (sf .get ().getTableMirror ()));
515+ break ;
516+ case ERROR :
517+ runStatus .getOperationStatistics ().getCounts ().incrementTables ();
518+ sf .get ().setStatus (ReturnStatus .Status .NEXTSTEP );
519+ break ;
520+ case FATAL :
521+ runStatus .getOperationStatistics ().getCounts ().incrementTables ();
522+ runStatus .getOperationStatistics ().getFailures ().incrementTables ();
523+ rtn = Boolean .FALSE ;
524+ sf .get ().setStatus (ReturnStatus .Status .NEXTSTEP );
525+ log .error ("FATAL: " , sf .get ().getException ());
526+ case NEXTSTEP :
527+ break ;
528+ case SKIP :
529+ runStatus .getOperationStatistics ().getCounts ().incrementTables ();
530+ // Set for tables that are being removed.
531+ runStatus .getOperationStatistics ().getSkipped ().incrementTables ();
532+ sf .get ().setStatus (ReturnStatus .Status .NEXTSTEP );
533+ break ;
539534 }
540- } catch (InterruptedException | ExecutionException e ) {
541- rtn = Boolean .FALSE ;
542- log .error ("Interrupted" , e );
543535 }
544- }
545- if (check )
546- break ;
547- try {
548- // Slow down the loop.
549- sleep (2000 );
550- } catch (InterruptedException e ) {
551- throw new RuntimeException (e );
536+ } catch (InterruptedException | ExecutionException | RuntimeException e ) {
537+ log .error ("Interrupted Table collection" , e );
538+ rtn = Boolean .FALSE ;
552539 }
553540 }
554541
@@ -587,42 +574,32 @@ public CompletableFuture<Boolean> run() {
587574 Set <TableMirror > migrationExecutions = new HashSet <>();
588575
589576 // Check the Migration Futures are done.
590- while (true ) {
591- boolean check = true ;
592- for (CompletableFuture <ReturnStatus > sf : migrationFuture ) {
593- if (!sf .isDone ()) {
594- check = false ;
595- continue ;
596- }
597- try {
598- if (sf .isDone () && sf .get () != null ) {
599- TableMirror tableMirror = sf .get ().getTableMirror ();
600- // Only push SUCCESSFUL tables to the migrationExecutions list.
601- if (sf .get ().getStatus () == ReturnStatus .Status .SUCCESS ) {
602- // Success means add table the execution list.
603- migrationExecutions .add (tableMirror );
604- }
577+ CompletableFuture .allOf (migrationFuture .toArray (new CompletableFuture [0 ])).join ();
578+
579+ // Check that all the CompletableFutures in 'migrationFuture' passed with ReturnStatus.Status.SUCCESS.
580+ for (CompletableFuture <ReturnStatus > sf : migrationFuture ) {
581+ try {
582+ ReturnStatus rs = sf .get ();
583+ if (nonNull (rs )) {
584+ TableMirror tableMirror = rs .getTableMirror ();
585+ // Only push SUCCESSFUL tables to the migrationExecutions list.
586+ if (rs .getStatus () == ReturnStatus .Status .SUCCESS ) {
587+ // Success means add table the execution list.
588+ migrationExecutions .add (tableMirror );
605589 }
606- } catch (InterruptedException | ExecutionException e ) {
607- log .error ("Interrupted" , e );
608- rtn = Boolean .FALSE ;
609- // throw new RuntimeException(e);
590+ } else {
591+ log .error ("ReturnStatus is NULL in migration build" );
610592 }
611- }
612- if (check )
613- break ;
614- try {
615- // Slow down the loop.
616- sleep (2000 );
617- } catch (InterruptedException e ) {
618- throw new RuntimeException (e );
593+ } catch (InterruptedException | ExecutionException | RuntimeException e ) {
594+ log .error ("Interrupted Building Migrations" , e );
595+ rtn = Boolean .FALSE ;
619596 }
620597 }
598+
621599 if (rtn ) {
622600 runStatus .setStage (StageEnum .BUILDING_TABLES , CollectionEnum .COMPLETED );
623601 } else {
624602 runStatus .setStage (StageEnum .BUILDING_TABLES , CollectionEnum .ERRORED );
625- // runStatus.addError(MessageCode.BUILDING_TABLES_ISSUE);
626603 }
627604
628605 migrationFuture .clear (); // reset
@@ -682,41 +659,31 @@ public CompletableFuture<Boolean> run() {
682659 migrationFuture .add (getTransferService ().execute (tableMirror ));
683660 }
684661
685- // Check the Migration Futures are done.
686- while (true ) {
687- boolean check = true ;
688- for (CompletableFuture <ReturnStatus > sf : migrationFuture ) {
689- if (!sf .isDone ()) {
690- check = false ;
691- break ;
692- }
693- try {
694- if (sf .isDone () && sf .get () != null ) {
695- TableMirror tableMirror = sf .get ().getTableMirror ();
696- if (sf .get ().getStatus () == ReturnStatus .Status .ERROR ) {
697- // Check if the table was removed, so that's not a processing error.
698- if (tableMirror != null ) {
699- if (!tableMirror .isRemove ()) {
700- rtn = Boolean .FALSE ;
701- }
662+ // Wait for all the CompletableFutures to finish.
663+ CompletableFuture .allOf (migrationFuture .toArray (new CompletableFuture [0 ])).join ();
664+ // Check that all the CompletableFutures in 'migrationFuture' passed with ReturnStatus.Status.SUCCESS.
665+ for (CompletableFuture <ReturnStatus > sf : migrationFuture ) {
666+ try {
667+ ReturnStatus rs = sf .get ();
668+ if (nonNull (rs )) {
669+ TableMirror tableMirror = rs .getTableMirror ();
670+ if (rs .getStatus () == ReturnStatus .Status .ERROR ) {
671+ // Check if the table was removed, so that's not a processing error.
672+ if (tableMirror != null ) {
673+ if (!tableMirror .isRemove ()) {
674+ rtn = Boolean .FALSE ;
702675 }
703676 }
704677 }
705- } catch (InterruptedException | ExecutionException e ) {
706- log .error ("Interrupted" , e );
707- rtn = Boolean .FALSE ;
708- // throw new RuntimeException(e);
678+ } else {
679+ log .error ("ReturnStatus is NULL in migrationFuture" );
709680 }
710- }
711- if (check )
712- break ;
713- try {
714- // Slow down the loop.
715- sleep (2000 );
716- } catch (InterruptedException e ) {
717- throw new RuntimeException (e );
681+ } catch (InterruptedException | ExecutionException | RuntimeException e ) {
682+ log .error ("Interrupted Migration Executions" , e );
683+ rtn = Boolean .FALSE ;
718684 }
719685 }
686+
720687 // If still TRUE, then we're good.
721688 if (rtn ) {
722689 runStatus .setStage (StageEnum .PROCESSING_TABLES , CollectionEnum .COMPLETED );
0 commit comments