Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ params {
profiles {
development { includeConfig "nextflow/configs/profiles/development.config" }
sumner2 { includeConfig "nextflow/configs/profiles/sumner2.config" }
compression { includeConfig "nextflow/bootstrap/dropbox_to_cloudian.config" }
}

env {
Expand Down
64 changes: 64 additions & 0 deletions nextflow/bootstrap/compress_and_transfer.sbatch
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/bin/bash
#
#SBATCH --job-name=KL_Compress_And_Transfer
#
#SBATCH --qos=long
#SBATCH --time=4-0:00:00
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=1
#SBATCH --mem=16G

# 25TB of data chunks should ideally have jobs run for about 36hrs, so we request 4 days just to be safe.
#
# 2800 [files in largest batch] * (
# 1.5 [minutes retrieval] / 2 [simultaneous jobs] +
# ( 24 [minutes compression] / 250 [simultaneous jobs] ) * ( 2/3 [load efficiency] )
# ) / 60 [minutes per hour]

module load nextflow

# There are batch files in the folder containing data to process.
# Batch files are titled batch_#.txt
BATCH_START=1
BATCH_END=73
cd /projects/kumar-lab/bgeuther/dropbox-compression/dropbox_file_lists/batches_25TB/

# Sanity checks on environment variable pointing to a valid batch to process.
re='^[0-9]+$'
if ! [[ $CURRENT_BATCH =~ $re ]]; then
echo "Error: CURRENT_BATCH is unset or not a number. Current value: \"${CURRENT_BATCH}\"" >&2
echo "Example submission: \"sbatch ${0} --export=CURRENT_BATCH=1\"" >&2
exit 1
elif ! [[ -f batch_${CURRENT_BATCH}.txt ]]; then
echo "Error: batch_${CURRENT_BATCH}.txt does not exist in the working directory. Cannot process." >&2
exit 1
fi

# Run the actual processing
nextflow run KumarLabJax/mouse-tracking-runtime \
-r compression-standalone \
-main-script nextflow/bootstrap/compress_dropbox_to_cloudian.nf \
-profile compression \
--input_batch batch_${CURRENT_BATCH}.txt \
-with-trace \
-with-report \
-latest

# Exit if something went wrong
STATE=$?
if [[ $STATE == 1 ]]; then
echo "Error: Nextflow run on batch ${CURRENT_BATCH} failed." >&2
exit 1
else
echo "Nextflow run on batch ${CURRENT_BATCH} succeeded."
fi

# Submit next batch if there is one.
NEXT_BATCH=$((CURRENT_BATCH + 1))
if [[ $NEXT_BATCH -lt $BATCH_END ]]; then
# Self-submit the next batch
SUBMITTED_JOBID=$(sbatch --parsable --export=CURRENT_BATCH="${NEXT_BATCH}" "${0}")
echo "Submitted next processing job (batch_${NEXT_BATCH}). Job ID: ${SUBMITTED_JOBID}."
else
echo "Batches complete! Exiting self-submitting loop."
fi
120 changes: 120 additions & 0 deletions nextflow/bootstrap/compress_dropbox_to_cloudian.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
nextflow.enable.dsl=2

/**
* Retrieves files from Dropbox using rclone.
*
* @param files_to_transfer A file containing a list of files to transfer
* @param rclone_prefix The rclone remote prefix where files are stored
* @param rclone_config The rclone config file that provides remote dropbox authentication
*
* @return remote_files A file containing a list of the retrieved files with full paths.
*/
process GET_DATA_FROM_DROPBOX {
container "/projects/kumar-lab/meta/images/mouse-tracking-runtime/rclone/latest.sif"
cpus 1
time 20.m
memory 4.GB
queue 'xfer'
clusterOptions '-q xfer'
maxForks 2

input:
path files_to_transfer
val rclone_prefix
path rclone_config

output:
path("fetched_files.txt"), emit: remote_files

script:
"""
rclone copy --ignore-checksum --config=${rclone_config} --transfers=1 --include-from ${files_to_transfer} ${rclone_prefix} retrieved_files/.
find \$(pwd)/retrieved_files/ -type f > fetched_files.txt
"""
}

/**
* Video compression
*
* @param tuple
* - filename The original input filename that be being compressed (to pass forward)
* - video_file The input video file to compress
*
* @return tuple files
* - filename Val Copy of original input filename compressed
* - file Path to compressed video
*/
process COMPRESS_VIDEO_CRF {
container "/projects/kumar-lab/meta/images/mouse-tracking-runtime/runtime/latest.sif"
cpus 2
memory 1.GB
time 2.hours
array 200
errorStrategy 'ignore'

input:
tuple val(filename), path(video_file)

output:
tuple val(filename), path("${video_file.baseName}_compressed.mp4"), emit: files

script:
"""
ffmpeg -i ${video_file} -c:v libx264 -pix_fmt yuv420p -preset veryfast -crf 23 -g 3000 -f mp4 ${video_file.baseName}_compressed.mp4
"""
}

/**
* Uploads a file to Cloudian using rclone.
*
* @param tuple
* - result_file The path to the result file
* - publish_filename The desired publish filename
* @param rclone_prefix The rclone remote prefix where files are to be uploaded
* @param rclone_config The rclone config file that provides remote cloudian authentication
*/
process PUT_DATA_TO_CLOUDIAN {
container "/projects/kumar-lab/meta/images/mouse-tracking-runtime/rclone/latest.sif"
cpus 1
time 10.m
memory 1.GB
array 200
queue 'xfer'
clusterOptions '-q xfer'
maxForks 2

input:
tuple path(result_file), val(publish_filename)
val rclone_prefix
path rclone_config

script:
"""
rclone copy --config=${rclone_config} --transfers=1 --copy-links --include ${result_file} . ${rclone_prefix}${publish_filename}
"""
}

/**
* Main workflow to retrieve data, compress it, and send it to a different remote server.
*
* params.input_batch is a list of files where each line contains a remote video to compress.
* The pipeline splits this file by line and submits individual dependent jobs that run the following 3 steps:
* 1. Retrieve the remote video data from dropbox to local compute
* 2. Compress the video data
* 3. Push the compressed video to cloudian remote storage
*/
workflow {
input_files = channel.fromPath(params.input_batch)
single_line_files = input_files.splitText(by: 1, file: true)
local_videos = GET_DATA_FROM_DROPBOX(single_line_files, params.dropbox_prefix, params.dropbox_config).remote_files
// `local_videos` is a text file containing the paths to the retrieved files. This is because we need folders (which are removed from Paths by nextflow)
// Convert that to a channel
local_video_channel = local_videos.splitText().map { line -> tuple(line.trim(), file(line.trim())) }
compressed_videos = COMPRESS_VIDEO_CRF(local_video_channel).files

sync_names = compressed_videos.map { original_file, video ->
tuple(video, "${file(original_file).getParent().toString().replaceAll(".*retrieved_files/", "/")}")
}
PUT_DATA_TO_CLOUDIAN(sync_names, params.cloudian_prefix, params.cloudian_config)

}
33 changes: 33 additions & 0 deletions nextflow/bootstrap/dropbox_to_cloudian.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Delete work directory upon successful completion of the entire workflow (all videos in the batch).
// This clears out the temporary copy of all video files.
cleanup = true
// The work directory is where videos are retrieved and compute conducted.
workDir = "/flashscratch/${USER}/dropbox-compression"

singularity {
enabled = true
autoMounts = true
}

process {
executor = 'slurm'
module = 'slurm'
}

executor {
name = 'slurm'
// The number of tasks the executor will handle in a parallel manner
queueSize = 100
submitRateLimit = '1 s'
}

params {
input_batch = ""
dropbox_config = "/projects/kumar-lab/meta/secrets/tokens/rclone_dropbox.conf"
dropbox_prefix = "labdropbox:\"/KumarLab's shared workspace/VideoData/MDS_Tests/\""

// Cloudian will not collapse slashes (it is TRUE object storage).
// This prefix assumes that the file starts with a slash.
cloudian_config = "/projects/kumar-lab/meta/secrets/tokens/rclone_cloudian.conf"
cloudian_prefix = "cloudian:dropbox-video-data/MDS_Videos"
}
5 changes: 5 additions & 0 deletions nextflow/bootstrap/nextflow.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
nextflow.enable.dsl=2

profiles {
compression { includeConfig "dropbox_to_cloudian.config" }
}