diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzFileCopyStrategy.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzFileCopyStrategy.groovy index 4ac9e072fe..cb4c9a635d 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzFileCopyStrategy.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzFileCopyStrategy.groovy @@ -15,6 +15,8 @@ */ package nextflow.cloud.azure.batch +import java.math.MathContext +import java.math.RoundingMode import java.nio.file.Path import groovy.transform.CompileStatic @@ -41,11 +43,13 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy { private Duration delayBetweenAttempts private String sasToken private Path remoteBinDir + private TaskBean task protected AzFileCopyStrategy() {} AzFileCopyStrategy(TaskBean bean, AzBatchExecutor executor) { super(bean) + this.task = bean this.config = executor.config this.remoteBinDir = executor.remoteBinDir this.sasToken = config.storage().sasToken @@ -64,6 +68,7 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy { copy.remove('PATH') copy.put('PATH', '$PWD/.nextflow-bin:$AZ_BATCH_NODE_SHARED_DIR/bin/:$PATH') copy.put('AZCOPY_LOG_LOCATION', '$PWD/.azcopy_log') + copy.put('AZCOPY_BUFFER_GB', String.valueOf((task.containerMemory?.toGiga()?: 1) * 0.8)) copy.put('AZ_SAS', sasToken) // finally render the environment @@ -92,7 +97,7 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy { result += 'downloads=(true)\n' result += super.getStageInputFilesScript(inputFiles) + '\n' - result += 'nxf_parallel "${downloads[@]}"\n' + result += 'for cmd in "${downloads[@]}"; do\n $cmd\ndone\n' return result } @@ -103,8 +108,8 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy { String stageInputFile( Path path, String targetName ) { // third param should not be escaped, because it's used in the grep match rule def stage_cmd = maxTransferAttempts > 1 - ? "downloads+=(\"nxf_cp_retry nxf_az_download '${AzHelper.toHttpUrl(path)}' ${Escape.path(targetName)}\")" - : "downloads+=(\"nxf_az_download '${AzHelper.toHttpUrl(path)}' ${Escape.path(targetName)}\")" + ? "downloads+=(\"nxf_cp_retry nxf_az_download ${AzHelper.toHttpUrl(path)} ${Escape.path(targetName)}\")" + : "downloads+=(\"nxf_az_download ${AzHelper.toHttpUrl(path)} ${Escape.path(targetName)}\")" return stage_cmd } @@ -128,10 +133,12 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy { uploads=() IFS=\$'\\n' for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do - uploads+=("nxf_az_upload '\$name' '${AzHelper.toHttpUrl(targetDir)}'") + uploads+=("nxf_az_upload '\$name' ${AzHelper.toHttpUrl(targetDir)}") done unset IFS - nxf_parallel "\${uploads[@]}" + for cmd in "\${uploads[@]}"; do + \$cmd + done """.stripIndent(true) } diff --git a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzFileCopyStrategyTest.groovy b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzFileCopyStrategyTest.groovy index 436fcb2ce9..42c75eeb7f 100644 --- a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzFileCopyStrategyTest.groovy +++ b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzFileCopyStrategyTest.groovy @@ -85,7 +85,9 @@ class AzFileCopyStrategyTest extends Specification { # stage input files downloads=(true) - nxf_parallel "${downloads[@]}" + for cmd in "${downloads[@]}"; do + $cmd + done '''.stripIndent() binding.unstage_controls == '''\ @@ -222,7 +224,9 @@ class AzFileCopyStrategyTest extends Specification { chmod +x $PWD/.nextflow-bin/* || true downloads=(true) - nxf_parallel "${downloads[@]}" + for cmd in "${downloads[@]}"; do + $cmd + done '''.stripIndent() binding.task_env == '''\ @@ -230,6 +234,7 @@ class AzFileCopyStrategyTest extends Specification { export BAR="any" export PATH="$PWD/.nextflow-bin:$AZ_BATCH_NODE_SHARED_DIR/bin/:$PATH" export AZCOPY_LOG_LOCATION="$PWD/.azcopy_log" + export AZCOPY_BUFFER_GB="0.8" export AZ_SAS="12345" '''.stripIndent() @@ -371,19 +376,23 @@ class AzFileCopyStrategyTest extends Specification { downloads=(true) rm -f file1.txt rm -f file2.txt - downloads+=("nxf_az_download 'http://account.blob.core.windows.net/my-data/work/dir/file1.txt' file1.txt") - downloads+=("nxf_az_download 'http://account.blob.core.windows.net/my-data/work/dir/file2.txt' file2.txt") - nxf_parallel "${downloads[@]}" + downloads+=("nxf_az_download http://account.blob.core.windows.net/my-data/work/dir/file1.txt file1.txt") + downloads+=("nxf_az_download http://account.blob.core.windows.net/my-data/work/dir/file2.txt file2.txt") + for cmd in "${downloads[@]}"; do + $cmd + done '''.stripIndent() binding.unstage_outputs == ''' uploads=() IFS=$'\\n' for name in $(eval "ls -1d foo.txt bar.fastq" | sort | uniq); do - uploads+=("nxf_az_upload '$name' 'http://account.blob.core.windows.net/my-data/work/dir'") + uploads+=("nxf_az_upload '$name' http://account.blob.core.windows.net/my-data/work/dir") done unset IFS - nxf_parallel "${uploads[@]}" + for cmd in "${uploads[@]}"; do + $cmd + done '''.stripIndent().leftTrim() binding.launch_cmd == '/bin/bash .command.run nxf_trace' @@ -391,6 +400,7 @@ class AzFileCopyStrategyTest extends Specification { binding.task_env == '''\ export PATH="$PWD/.nextflow-bin:$AZ_BATCH_NODE_SHARED_DIR/bin/:$PATH" export AZCOPY_LOG_LOCATION="$PWD/.azcopy_log" + export AZCOPY_BUFFER_GB="0.8" export AZ_SAS="12345" '''.stripIndent()