Skip to content

fix: Cap azcopy memory usage at 80% of container memory #6177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package nextflow.cloud.azure.batch

import java.math.MathContext
import java.math.RoundingMode
import java.nio.file.Path

import groovy.transform.CompileStatic
Expand All @@ -41,11 +43,13 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy {
private Duration delayBetweenAttempts
private String sasToken
private Path remoteBinDir
private TaskBean task

protected AzFileCopyStrategy() {}

AzFileCopyStrategy(TaskBean bean, AzBatchExecutor executor) {
super(bean)
this.task = bean
this.config = executor.config
this.remoteBinDir = executor.remoteBinDir
this.sasToken = config.storage().sasToken
Expand All @@ -64,6 +68,7 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy {
copy.remove('PATH')
copy.put('PATH', '$PWD/.nextflow-bin:$AZ_BATCH_NODE_SHARED_DIR/bin/:$PATH')
copy.put('AZCOPY_LOG_LOCATION', '$PWD/.azcopy_log')
copy.put('AZCOPY_BUFFER_GB', String.valueOf((task.containerMemory?.toGiga()?: 1) * 0.8))
copy.put('AZ_SAS', sasToken)

// finally render the environment
Expand Down Expand Up @@ -92,7 +97,7 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy {

result += 'downloads=(true)\n'
result += super.getStageInputFilesScript(inputFiles) + '\n'
result += 'nxf_parallel "${downloads[@]}"\n'
result += 'for cmd in "${downloads[@]}"; do\n $cmd\ndone\n'
return result
}

Expand All @@ -103,8 +108,8 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy {
String stageInputFile( Path path, String targetName ) {
// third param should not be escaped, because it's used in the grep match rule
def stage_cmd = maxTransferAttempts > 1
? "downloads+=(\"nxf_cp_retry nxf_az_download '${AzHelper.toHttpUrl(path)}' ${Escape.path(targetName)}\")"
: "downloads+=(\"nxf_az_download '${AzHelper.toHttpUrl(path)}' ${Escape.path(targetName)}\")"
? "downloads+=(\"nxf_cp_retry nxf_az_download ${AzHelper.toHttpUrl(path)} ${Escape.path(targetName)}\")"
: "downloads+=(\"nxf_az_download ${AzHelper.toHttpUrl(path)} ${Escape.path(targetName)}\")"
return stage_cmd
}

Expand All @@ -128,10 +133,12 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy {
uploads=()
IFS=\$'\\n'
for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do
uploads+=("nxf_az_upload '\$name' '${AzHelper.toHttpUrl(targetDir)}'")
uploads+=("nxf_az_upload '\$name' ${AzHelper.toHttpUrl(targetDir)}")
done
unset IFS
nxf_parallel "\${uploads[@]}"
for cmd in "\${uploads[@]}"; do
\$cmd
done
""".stripIndent(true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ class AzFileCopyStrategyTest extends Specification {
# stage input files
downloads=(true)

nxf_parallel "${downloads[@]}"
for cmd in "${downloads[@]}"; do
$cmd
done
'''.stripIndent()

binding.unstage_controls == '''\
Expand Down Expand Up @@ -222,14 +224,17 @@ class AzFileCopyStrategyTest extends Specification {
chmod +x $PWD/.nextflow-bin/* || true
downloads=(true)

nxf_parallel "${downloads[@]}"
for cmd in "${downloads[@]}"; do
$cmd
done
'''.stripIndent()

binding.task_env == '''\
export FOO="1"
export BAR="any"
export PATH="$PWD/.nextflow-bin:$AZ_BATCH_NODE_SHARED_DIR/bin/:$PATH"
export AZCOPY_LOG_LOCATION="$PWD/.azcopy_log"
export AZCOPY_BUFFER_GB="0.8"
export AZ_SAS="12345"
'''.stripIndent()

Expand Down Expand Up @@ -371,26 +376,31 @@ class AzFileCopyStrategyTest extends Specification {
downloads=(true)
rm -f file1.txt
rm -f file2.txt
downloads+=("nxf_az_download 'http://account.blob.core.windows.net/my-data/work/dir/file1.txt' file1.txt")
downloads+=("nxf_az_download 'http://account.blob.core.windows.net/my-data/work/dir/file2.txt' file2.txt")
nxf_parallel "${downloads[@]}"
downloads+=("nxf_az_download http://account.blob.core.windows.net/my-data/work/dir/file1.txt file1.txt")
downloads+=("nxf_az_download http://account.blob.core.windows.net/my-data/work/dir/file2.txt file2.txt")
for cmd in "${downloads[@]}"; do
$cmd
done
'''.stripIndent()

binding.unstage_outputs == '''
uploads=()
IFS=$'\\n'
for name in $(eval "ls -1d foo.txt bar.fastq" | sort | uniq); do
uploads+=("nxf_az_upload '$name' 'http://account.blob.core.windows.net/my-data/work/dir'")
uploads+=("nxf_az_upload '$name' http://account.blob.core.windows.net/my-data/work/dir")
done
unset IFS
nxf_parallel "${uploads[@]}"
for cmd in "${uploads[@]}"; do
$cmd
done
'''.stripIndent().leftTrim()

binding.launch_cmd == '/bin/bash .command.run nxf_trace'

binding.task_env == '''\
export PATH="$PWD/.nextflow-bin:$AZ_BATCH_NODE_SHARED_DIR/bin/:$PATH"
export AZCOPY_LOG_LOCATION="$PWD/.azcopy_log"
export AZCOPY_BUFFER_GB="0.8"
export AZ_SAS="12345"
'''.stripIndent()

Expand Down
Loading