diff --git a/nextflow.config b/nextflow.config index 3d8189c..a3a60fc 100644 --- a/nextflow.config +++ b/nextflow.config @@ -61,6 +61,7 @@ params { profiles { development { includeConfig "nextflow/configs/profiles/development.config" } sumner2 { includeConfig "nextflow/configs/profiles/sumner2.config" } + compression { includeConfig "nextflow/bootstrap/dropbox_to_cloudian.config" } } env { diff --git a/nextflow/bootstrap/compress_and_transfer.sbatch b/nextflow/bootstrap/compress_and_transfer.sbatch new file mode 100644 index 0000000..210bf44 --- /dev/null +++ b/nextflow/bootstrap/compress_and_transfer.sbatch @@ -0,0 +1,64 @@ +#!/bin/bash +# +#SBATCH --job-name=KL_Compress_And_Transfer +# +#SBATCH --qos=long +#SBATCH --time=4-0:00:00 +#SBATCH --ntasks=1 +#SBATCH --cpus-per-task=1 +#SBATCH --mem=16G + +# 25TB of data chunks should ideally have jobs run for about 36hrs, so we request 4 days just to be safe. +# +# 2800 [files in largest batch] * ( +# 1.5 [minutes retrieval] / 2 [simultaneous jobs] + +# ( 24 [minutes compression] / 250 [simultaneous jobs] ) * ( 2/3 [load efficiency] ) +# ) / 60 [minutes per hour] + +module load nextflow + +# There are batch files in the folder containing data to process. +# Batch files are titled batch_#.txt +BATCH_START=1 +BATCH_END=73 +cd /projects/kumar-lab/bgeuther/dropbox-compression/dropbox_file_lists/batches_25TB/ + +# Sanity checks on environment variable pointing to a valid batch to process. +re='^[0-9]+$' +if ! [[ $CURRENT_BATCH =~ $re ]]; then + echo "Error: CURRENT_BATCH is unset or not a number. Current value: \"${CURRENT_BATCH}\"" >&2 + echo "Example submission: \"sbatch ${0} --export=CURRENT_BATCH=1\"" >&2 + exit 1 +elif ! [[ -f batch_${CURRENT_BATCH}.txt ]]; then + echo "Error: batch_${CURRENT_BATCH}.txt does not exist in the working directory. Cannot process." >&2 + exit 1 +fi + +# Run the actual processing +nextflow run KumarLabJax/mouse-tracking-runtime \ + -r compression-standalone \ + -main-script nextflow/bootstrap/compress_dropbox_to_cloudian.nf \ + -profile compression \ + --input_batch batch_${CURRENT_BATCH}.txt \ + -with-trace \ + -with-report \ + -latest + +# Exit if something went wrong +STATE=$? +if [[ $STATE == 1 ]]; then + echo "Error: Nextflow run on batch ${CURRENT_BATCH} failed." >&2 + exit 1 +else + echo "Nextflow run on batch ${CURRENT_BATCH} succeeded." +fi + +# Submit next batch if there is one. +NEXT_BATCH=$((CURRENT_BATCH + 1)) +if [[ $NEXT_BATCH -le $BATCH_END ]]; then + # Self-submit the next batch + SUBMITTED_JOBID=$(sbatch --parsable --export=CURRENT_BATCH="${NEXT_BATCH}" "${0}") + echo "Submitted next processing job (batch_${NEXT_BATCH}). Job ID: ${SUBMITTED_JOBID}." +else + echo "Batches complete! Exiting self-submitting loop." +fi diff --git a/nextflow/bootstrap/compress_dropbox_to_cloudian.nf b/nextflow/bootstrap/compress_dropbox_to_cloudian.nf new file mode 100644 index 0000000..7fade80 --- /dev/null +++ b/nextflow/bootstrap/compress_dropbox_to_cloudian.nf @@ -0,0 +1,122 @@ +nextflow.enable.dsl=2 + +/** + * Retrieves files from Dropbox using rclone. + * + * @param files_to_transfer A file containing a list of files to transfer + * @param rclone_prefix The rclone remote prefix where files are stored + * @param rclone_config The rclone config file that provides remote dropbox authentication + * + * @return remote_files A file containing a list of the retrieved files with full paths. + */ +process GET_DATA_FROM_DROPBOX { + container "/projects/kumar-lab/meta/images/mouse-tracking-runtime/rclone/latest.sif" + cpus 1 + time 20.m + memory 4.GB + queue 'xfer' + clusterOptions '-q xfer' + maxForks 2 + errorStrategy 'ignore' + + input: + path files_to_transfer + val rclone_prefix + path rclone_config + + output: + path("fetched_files.txt"), emit: remote_files + + script: + """ + rclone copy --ignore-checksum --config=${rclone_config} --transfers=1 --include-from ${files_to_transfer} ${rclone_prefix} retrieved_files/. + find \$(pwd)/retrieved_files/ -type f > fetched_files.txt + """ +} + +/** + * Video compression + * + * @param tuple + * - filename The original input filename that be being compressed (to pass forward) + * - video_file The input video file to compress + * + * @return tuple files + * - filename Val Copy of original input filename compressed + * - file Path to compressed video + */ +process COMPRESS_VIDEO_CRF { + container "/projects/kumar-lab/meta/images/mouse-tracking-runtime/runtime/latest.sif" + cpus 2 + memory 1.GB + time 2.hours + array 200 + errorStrategy 'ignore' + + input: + tuple val(filename), path(video_file) + + output: + tuple val(filename), path("${video_file.baseName}_compressed.mp4"), emit: files + + script: + """ + ffmpeg -i ${video_file} -c:v libx264 -pix_fmt yuv420p -preset veryfast -crf 23 -g 3000 -f mp4 ${video_file.baseName}_compressed.mp4 + """ +} + +/** + * Uploads a file to Cloudian using rclone. + * + * @param tuple + * - result_file The path to the result file + * - publish_filename The desired publish filename + * @param rclone_prefix The rclone remote prefix where files are to be uploaded + * @param rclone_config The rclone config file that provides remote cloudian authentication + */ +process PUT_DATA_TO_CLOUDIAN { + container "/projects/kumar-lab/meta/images/mouse-tracking-runtime/rclone/latest.sif" + cpus 1 + time 10.m + memory 1.GB + array 200 + queue 'xfer' + clusterOptions '-q xfer' + maxForks 2 + errorStrategy 'ignore' + + input: + tuple path(result_file), val(publish_filename) + val rclone_prefix + path rclone_config + + script: + """ + rclone copy --config=${rclone_config} --transfers=1 --copy-links --include ${result_file} . ${rclone_prefix}${publish_filename} + """ +} + +/** + * Main workflow to retrieve data, compress it, and send it to a different remote server. + * + * params.input_batch is a list of files where each line contains a remote video to compress. + * The pipeline splits this file by line and submits individual dependent jobs that run the following 3 steps: + * 1. Retrieve the remote video data from dropbox to local compute + * 2. Compress the video data + * 3. Push the compressed video to cloudian remote storage + */ +workflow { + input_files = channel.fromPath(params.input_batch) + single_line_files = input_files.splitText(by: 1, file: true) + local_videos = GET_DATA_FROM_DROPBOX(single_line_files, params.dropbox_prefix, params.dropbox_config).remote_files + // `local_videos` is a text file containing the paths to the retrieved files. This is because we need folders (which are removed from Paths by nextflow) + // Convert that to a channel + local_video_channel = local_videos.splitText().map { line -> tuple(line.trim(), file(line.trim())) } + compressed_videos = COMPRESS_VIDEO_CRF(local_video_channel).files + + sync_names = compressed_videos.map { original_file, video -> + tuple(video, "${file(original_file).getParent().toString().replaceAll(".*retrieved_files/", "/")}") + } + PUT_DATA_TO_CLOUDIAN(sync_names, params.cloudian_prefix, params.cloudian_config) + +} \ No newline at end of file diff --git a/nextflow/bootstrap/dropbox_to_cloudian.config b/nextflow/bootstrap/dropbox_to_cloudian.config new file mode 100644 index 0000000..d8a5adc --- /dev/null +++ b/nextflow/bootstrap/dropbox_to_cloudian.config @@ -0,0 +1,33 @@ +// Delete work directory upon successful completion of the entire workflow (all videos in the batch). +// This clears out the temporary copy of all video files. +cleanup = true +// The work directory is where videos are retrieved and compute conducted. +workDir = "/flashscratch/${USER}/dropbox-compression" + +singularity { + enabled = true + autoMounts = true + } + +process { + executor = 'slurm' + module = 'slurm' +} + +executor { + name = 'slurm' + // The number of tasks the executor will handle in a parallel manner + queueSize = 100 + submitRateLimit = '1 s' +} + +params { + input_batch = "" + dropbox_config = "/projects/kumar-lab/meta/secrets/tokens/rclone_dropbox.conf" + dropbox_prefix = "labdropbox:\"/KumarLab's shared workspace/VideoData/MDS_Tests/\"" + + // Cloudian will not collapse slashes (it is TRUE object storage). + // This prefix assumes that the file starts with a slash. + cloudian_config = "/projects/kumar-lab/meta/secrets/tokens/rclone_cloudian.conf" + cloudian_prefix = "cloudian:dropbox-video-data/MDS_Videos" +} diff --git a/nextflow/bootstrap/nextflow.config b/nextflow/bootstrap/nextflow.config new file mode 100644 index 0000000..d7fd58c --- /dev/null +++ b/nextflow/bootstrap/nextflow.config @@ -0,0 +1,5 @@ +nextflow.enable.dsl=2 + +profiles { + compression { includeConfig "dropbox_to_cloudian.config" } +}