diff --git a/.github/workflows/qiita-plugin-ci.yml b/.github/workflows/qiita-plugin-ci.yml index 46d280dc..92dceaac 100644 --- a/.github/workflows/qiita-plugin-ci.yml +++ b/.github/workflows/qiita-plugin-ci.yml @@ -83,7 +83,7 @@ jobs: # seqtk is a conda-installed requirement for metapool and isn't # installed automatically by its setup.py. conda config --add channels bioconda - conda create -q --yes -n klp python=3.9 'seqtk>=1.4' + conda create -q --yes -n klp python=3.9 'seqtk>=1.4' pbtk conda activate klp diff --git a/pyproject.toml b/pyproject.toml index fb74ed59..09a38ea5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,3 +59,5 @@ dependencies = [ configure_klp = "qp_klp.scripts.configure_klp:config" start_klp = "qp_klp.scripts.start_klp:execute" demux = "sequence_processing_pipeline.scripts.cli:demux" +demux_just_fwd = "sequence_processing_pipeline.Commands:demux_just_fwd" +pacbio_generate_bam2fastq_commands = "qp_klp.scripts.pacbio_commands:generate_bam2fastq_commands" diff --git a/src/qp_klp/Assays.py b/src/qp_klp/Assays.py index 3ad6e873..7ed35cf3 100644 --- a/src/qp_klp/Assays.py +++ b/src/qp_klp/Assays.py @@ -494,6 +494,7 @@ def qc_reads(self): # is not defined, just fallback to the SPP expected default regex if not hasattr(self, 'files_regex'): self.files_regex = 'SPP' + # base quality control used by multiple Assay types. job = NuQCJob(self.raw_fastq_files_path, self.pipeline.output_path, @@ -517,7 +518,8 @@ def qc_reads(self): bucket_size=config['bucket_size'], length_limit=config['length_limit'], cores_per_task=config['cores_per_task'], - files_regex=self.files_regex) + files_regex=self.files_regex, + read_length=self.read_length) if 'NuQCJob' not in self.skip_steps: job.run(callback=self.job_callback) diff --git a/src/qp_klp/PacBioMetagenomicWorkflow.py b/src/qp_klp/PacBioMetagenomicWorkflow.py new file mode 100644 index 00000000..7a83d8e5 --- /dev/null +++ b/src/qp_klp/PacBioMetagenomicWorkflow.py @@ -0,0 +1,71 @@ +from .Protocol import PacBio +from sequence_processing_pipeline.Pipeline import Pipeline +from .Assays import Metagenomic +from .Assays import ASSAY_NAME_METAGENOMIC +from .FailedSamplesRecord import FailedSamplesRecord +from .Workflows import Workflow +import pandas as pd +from os.path import join + + +class PacBioMetagenomicWorkflow(Workflow, Metagenomic, PacBio): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + self.mandatory_attributes = ['qclient', 'uif_path', + 'lane_number', 'config_fp', + 'run_identifier', 'output_dir', 'job_id', + 'is_restart'] + + self.confirm_mandatory_attributes() + + # second stage initializer that could conceivably be pushed down into + # specific children requiring specific parameters. + self.qclient = self.kwargs['qclient'] + + self.overwrite_prep_with_original = False + if 'overwrite_prep_with_original' in self.kwargs: + self.overwrite_prep_with_original = \ + self.kwargs['overwrite_prep_with_original'] + self.pipeline = Pipeline(self.kwargs['config_fp'], + self.kwargs['run_identifier'], + self.kwargs['uif_path'], + self.kwargs['output_dir'], + self.kwargs['job_id'], + ASSAY_NAME_METAGENOMIC, + lane_number=self.kwargs['lane_number']) + + self.fsr = FailedSamplesRecord(self.kwargs['output_dir'], + self.pipeline.sample_sheet.samples) + + samples = [ + {'barcode': sample['barcode_id'], + 'sample_name': sample['Sample_ID'], + 'project_name': sample['Sample_Project'], + 'lane': sample['Lane']} + for sample in self.pipeline.sample_sheet.samples] + df = pd.DataFrame(samples) + sample_list_fp = f"{self.kwargs['output_dir']}/sample_list.tsv" + df.to_csv(sample_list_fp, sep='\t', index=False) + + self.master_qiita_job_id = self.kwargs['job_id'] + + self.lane_number = self.kwargs['lane_number'] + self.is_restart = bool(self.kwargs['is_restart']) + + if self.is_restart is True: + self.raw_fastq_files_path = join(self.pipeline.output_path, + 'ConvertJob') + self.reports_path = join(self.raw_fastq_files_path, + 'SeqCounts.csv') + self.determine_steps_to_skip() + + # this is a convenience member to allow testing w/out updating Qiita. + self.update = True + + if 'update_qiita' in kwargs: + if not isinstance(kwargs['update_qiita'], bool): + raise ValueError("value for 'update_qiita' must be of " + "type bool") + + self.update = kwargs['update_qiita'] diff --git a/src/qp_klp/Protocol.py b/src/qp_klp/Protocol.py index 1a3d805e..4191d757 100644 --- a/src/qp_klp/Protocol.py +++ b/src/qp_klp/Protocol.py @@ -1,14 +1,17 @@ -from sequence_processing_pipeline.ConvertJob import ConvertJob +from sequence_processing_pipeline.ConvertJob import ( + ConvertJob, ConvertPacBioBam2FastqJob) from sequence_processing_pipeline.TellReadJob import TellReadJob from sequence_processing_pipeline.SeqCountsJob import SeqCountsJob from sequence_processing_pipeline.TRIntegrateJob import TRIntegrateJob from sequence_processing_pipeline.PipelineError import PipelineError from sequence_processing_pipeline.util import determine_orientation -from os.path import join, split, basename, dirname from re import match from os import makedirs, rename, walk +from os.path import join, split, basename, dirname, exists from metapool import load_sample_sheet -from metapool.sample_sheet import PROTOCOL_NAME_ILLUMINA, PROTOCOL_NAME_TELLSEQ +from metapool.sample_sheet import ( + PROTOCOL_NAME_ILLUMINA, PROTOCOL_NAME_TELLSEQ, + PROTOCOL_NAME_PACBIO_SMRT) import pandas as pd from glob import glob from qiita_client.util import system_call @@ -79,6 +82,26 @@ def subsample_reads(self): class Illumina(Protocol): protocol_type = PROTOCOL_NAME_ILLUMINA + # required files for successful operation for Illumina (making the default + # here) both RTAComplete.txt and RunInfo.xml should reside in the root of + # the run directory. + required_files = ['RTAComplete.txt', 'RunInfo.xml'] + read_length = 'short' + + def __init__(self) -> None: + super().__init__() + + for some_file in self.required_files: + if not exists(join(self.run_dir, some_file)): + raise PipelineError(f"required file '{some_file}' is not " + f"present in {self.run_dir}.") + + # verify that RunInfo.xml file is readable. + try: + fp = open(join(self.run_dir, 'RunInfo.xml')) + fp.close() + except PermissionError: + raise PipelineError('RunInfo.xml is present, but not readable') def convert_raw_to_fastq(self): def get_config(command): @@ -156,6 +179,7 @@ def generate_sequence_counts(self): class TellSeq(Protocol): protocol_type = PROTOCOL_NAME_TELLSEQ + read_length = 'short' def convert_raw_to_fastq(self): config = self.pipeline.get_software_configuration('tell-seq') @@ -369,3 +393,75 @@ def _post_process_file(self, fastq_file, mapping, lane): rename(fastq_file, final_path) return final_path + + +class PacBio(Protocol): + protocol_type = PROTOCOL_NAME_PACBIO_SMRT + read_length = 'long' + + def convert_raw_to_fastq(self): + config = self.pipeline.get_software_configuration('pacbio_convert') + + job = ConvertPacBioBam2FastqJob( + self.pipeline.run_dir, + self.pipeline.output_path, + self.pipeline.input_file_path, + config['queue'], + config['nodes'], + config['nprocs'], + config['wallclock_time_in_minutes'], + config['per_process_memory_limit'], + config['executable_path'], + config['modules_to_load'], + self.master_qiita_job_id) + + self.raw_fastq_files_path = join(self.pipeline.output_path, + 'ConvertJob') + + # if ConvertJob already completed, then skip the over the time- + # consuming portion but populate the needed member variables. + if 'ConvertJob' not in self.skip_steps: + job.run(callback=self.job_callback) + + # audit the results to determine which samples failed to convert + # properly. Append these to the failed-samples report and also + # return the list directly to the caller. + failed_samples = job.audit(self.pipeline.get_sample_ids()) + if hasattr(self, 'fsr'): + # NB 16S does not require a failed samples report and + # it is not performed by SPP. + self.fsr.write(failed_samples, job.__class__.__name__) + + return failed_samples + + def generate_sequence_counts(self): + # for other instances of generate_sequence_counts in other objects + # the sequence counting needs to be done; however, for PacBio we + # already have done it and just need to merge the results. + gz_files = glob(f'{self.raw_fastq_files_path}/*/*.fastq.gz') + data, missing_files = [], [] + + for gzf in gz_files: + cf = gzf.replace('.fastq.gz', '.counts.txt') + sn = basename(cf).replace( + f'_S000_L00{self.lane_number}_R1_001.counts.txt', '') + if not exists(cf): + missing_files.append(sn) + continue + with open(cf, 'r') as fh: + counts = fh.read().strip() + data.append({'Sample_ID': sn, + 'raw_reads_r1r2': counts, + 'Lane': self.lane_number}) + + if missing_files: + raise ValueError(f'Missing count files: {missing_files}') + + df = pd.DataFrame(data) + self.reports_path = join(self.pipeline.output_path, + 'ConvertJob', + 'SeqCounts.csv') + df.to_csv(self.reports_path, index=False) + + def integrate_results(self): + pass diff --git a/src/qp_klp/WorkflowFactory.py b/src/qp_klp/WorkflowFactory.py index 81b6e024..870449e5 100644 --- a/src/qp_klp/WorkflowFactory.py +++ b/src/qp_klp/WorkflowFactory.py @@ -3,6 +3,7 @@ from .StandardMetatranscriptomicWorkflow import \ StandardMetatranscriptomicWorkflow from .TellseqMetagenomicWorkflow import TellSeqMetagenomicWorkflow +from .PacBioMetagenomicWorkflow import PacBioMetagenomicWorkflow from sequence_processing_pipeline.Pipeline import Pipeline from metapool import load_sample_sheet from metapool.sample_sheet import SAMPLE_SHEETS_BY_PROTOCOL as SSBP @@ -14,7 +15,8 @@ class WorkflowFactory(): WORKFLOWS = [StandardMetagenomicWorkflow, StandardMetatranscriptomicWorkflow, StandardAmpliconWorkflow, - TellSeqMetagenomicWorkflow] + TellSeqMetagenomicWorkflow, + PacBioMetagenomicWorkflow] @classmethod def _get_instrument_type(cls, sheet): @@ -76,6 +78,9 @@ def generate_workflow(cls, **kwargs): "sample-sheet or a mapping-file.") for workflow in WorkflowFactory.WORKFLOWS: + if workflow.read_length not in {'short', 'long'}: + raise ValueError('Invalid read_length: ' + f'{workflow.read_length} for {workflow}') if workflow.assay_type == assay_type: if workflow.protocol_type == instrument_type: # return instantiated workflow object diff --git a/src/qp_klp/Workflows.py b/src/qp_klp/Workflows.py index 19ab808b..9155004c 100644 --- a/src/qp_klp/Workflows.py +++ b/src/qp_klp/Workflows.py @@ -8,6 +8,7 @@ import logging from shutil import rmtree from .Assays import ASSAY_NAME_AMPLICON +from metapool.sample_sheet import PROTOCOL_NAME_PACBIO_SMRT from sequence_processing_pipeline.util import determine_orientation @@ -600,6 +601,10 @@ def _get_postqc_fastq_files(self, out_dir, project): if self.pipeline.pipeline_type != ASSAY_NAME_AMPLICON: del (files['raw_barcodes']) + # PacBio doesn't have reverse reads + if self.protocol_type == PROTOCOL_NAME_PACBIO_SMRT: + del (files['raw_reverse_seqs']) + # confirm expected lists of reads are not empty. for f_type in files: if not files[f_type]: @@ -621,8 +626,13 @@ def _load_prep_into_qiita(self, qclient, prep_id, artifact_name, 'prep_id': prep_id, 'artifact_type': atype, 'command_artifact_name': artifact_name, - 'add_default_workflow': True, 'files': dumps(fastq_files)} + # this will block adding the default workflow to the + # long read / pacbio processing; which is desirable + # until we have a processing pipeline - note that + # this will only be added if _not_ 'long' + if self.read_length != 'long': + pdata['add_default_workflow'] = True job_id = qclient.post('/qiita_db/artifact/', data=pdata)['job_id'] diff --git a/src/qp_klp/scripts/pacbio_commands.py b/src/qp_klp/scripts/pacbio_commands.py new file mode 100755 index 00000000..b0ab0717 --- /dev/null +++ b/src/qp_klp/scripts/pacbio_commands.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python + +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The Qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- +import click +import pandas as pd +from glob import glob +from os import makedirs + + +@click.command() +@click.argument('sample_list', required=True) +@click.argument('run_folder', required=True) +@click.argument('outdir', required=True) +@click.argument('threads', required=True, default=1) +def generate_bam2fastq_commands(sample_list, run_folder, outdir, threads): + """Generates the bam2fastq commands""" + df = pd.read_csv(sample_list, sep='\t') + + # pacbio raw files are in a hifi_reads folder, wihtin multiple folders + # (1_A01, 2_A02, ect), within the run-id folder; and are named + # m[run-id]XXX.hifi_reads.[barcode].bam; thus to find the [barcode] we + # can split on '.' and then the second to last element [-2]. + files = {f.split('.')[-2]: f + for f in glob(f'{run_folder}/*/hifi_reads/*.bam')} + + makedirs(outdir, exist_ok=True) + + commands, missing_files = [], [] + for _, row in df.iterrows(): + bc = row['barcode'] + sn = row['sample_name'] + pn = row['project_name'] + lane = row['lane'] + if bc not in files: + missing_files.append(bc) + continue + od = f'{outdir}/{pn}' + + makedirs(od, exist_ok=True) + fn = f'{od}/{sn}_S000_L00{lane}_R1_001' + cmd = (f'bam2fastq -j {threads} -o {fn} -c 9 ' + f'{files[bc]}; ' + f'fqtools count {fn}.fastq.gz > ' + f'{fn}.counts.txt') + commands.append(cmd) + + if missing_files: + raise ValueError( + f'{run_folder} is missing barcodes: {missing_files}') + + for cmd in commands: + print(cmd) diff --git a/src/sequence_processing_pipeline/Commands.py b/src/sequence_processing_pipeline/Commands.py index f2a196b2..9337ba4f 100644 --- a/src/sequence_processing_pipeline/Commands.py +++ b/src/sequence_processing_pipeline/Commands.py @@ -1,17 +1,19 @@ import glob import gzip import os +import click from sequence_processing_pipeline.util import (iter_paired_files, determine_orientation) def split_similar_size_bins(data_location_path, max_file_list_size_in_gb, - batch_prefix): + batch_prefix, allow_fwd_only=False): '''Partitions input fastqs to coarse bins :param data_location_path: Path to the ConvertJob directory. :param max_file_list_size_in_gb: Upper threshold for file-size. :param batch_prefix: Path + file-name prefix for output-files. + :param allow_fwd_only: ignore rev match, helpful for long reads. :return: The number of output-files created, size of largest bin. ''' # to prevent issues w/filenames like the ones below from being mistaken @@ -37,31 +39,56 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb, bucket_size = 0 max_bucket_size = 0 - for a, b in iter_paired_files(fastq_paths): - r1_size = os.stat(a).st_size - r2_size = os.stat(b).st_size - - output_base = os.path.dirname(a).split('/')[-1] - if current_size + r1_size > max_size: - # bucket is full. - if bucket_size > max_bucket_size: - max_bucket_size = bucket_size - - # reset bucket_size. - bucket_size = r1_size + r2_size - - if fp is not None: - fp.close() - - split_offset += 1 - current_size = r1_size - fp = open(batch_prefix + '-%d' % split_offset, 'w') - else: - # add to bucket_size - bucket_size += r1_size + r2_size - current_size += r1_size - - fp.write("%s\t%s\t%s\n" % (a, b, output_base)) + if allow_fwd_only: + for a in fastq_paths: + r1_size = os.stat(a).st_size + output_base = os.path.dirname(a).split('/')[-1] + if current_size + r1_size > max_size: + # bucket is full. + if bucket_size > max_bucket_size: + max_bucket_size = bucket_size + + # reset bucket_size. + bucket_size = r1_size + + if fp is not None: + fp.close() + + split_offset += 1 + current_size = r1_size + fp = open(batch_prefix + '-%d' % split_offset, 'w') + else: + # add to bucket_size + bucket_size += r1_size + current_size += r1_size + + fp.write("%s\t%s\n" % (a, output_base)) + else: + for a, b in iter_paired_files(fastq_paths): + r1_size = os.stat(a).st_size + r2_size = os.stat(b).st_size + + output_base = os.path.dirname(a).split('/')[-1] + if current_size + r1_size > max_size: + # bucket is full. + if bucket_size > max_bucket_size: + max_bucket_size = bucket_size + + # reset bucket_size. + bucket_size = r1_size + r2_size + + if fp is not None: + fp.close() + + split_offset += 1 + current_size = r1_size + fp = open(batch_prefix + '-%d' % split_offset, 'w') + else: + # add to bucket_size + bucket_size += r1_size + r2_size + current_size += r1_size + + fp.write("%s\t%s\t%s\n" % (a, b, output_base)) if fp is not None: fp.close() @@ -166,3 +193,88 @@ def demux(id_map, fp, out_d, task, maxtask): for d in openfps.values(): for f in d.values(): f.close() + + +@click.group() +def cli(): + pass + + +@cli.command() +@click.option('--id-map', type=click.Path(exists=True), required=True) +@click.option('--infile', type=click.Path(exists=True), required=True) +@click.option('--output', type=click.Path(exists=True), required=True) +@click.option('--task', type=int, required=True) +@click.option('--maxtask', type=int, required=True) +def demux_just_fwd(id_map, infile, output, task, maxtask): + with open(id_map, 'r') as f: + id_map = f.readlines() + id_map = [line.strip().split('\t') for line in id_map] + + # fp needs to be an open file handle. + # ensure task and maxtask are proper ints when coming from cmd-line. + with open(infile, 'r') as fp: + demux_just_fwd_processing(id_map, fp, output, int(task), int(maxtask)) + + +def demux_just_fwd_processing(id_map, fp, out_d, task, maxtask): + """Split infile data based in provided map""" + delimiter = '::MUX::' + mode = 'wt' + ext = '.fastq.gz' + sep = '/' + rec = '@' + + openfps = {} + + for offset, (idx, r1, outbase) in enumerate(id_map): + if offset % maxtask == task: + idx = rec + idx + + # setup output locations + outdir = out_d + sep + outbase + fullname_r1 = outdir + sep + r1 + ext + + # we have seen in lustre that sometime this line + # can have a raise condition; making sure it doesn't break + # things + try: + os.makedirs(outdir, exist_ok=True) + except FileExistsError: + pass + current_fp_r1 = gzip.open(fullname_r1, mode) + current_fp = {'1': current_fp_r1} + openfps[idx] = current_fp + + # setup a parser + seq_id = iter(fp) + seq = iter(fp) + dumb = iter(fp) + qual = iter(fp) + + # there is only fwd so the orientation is always '1' + orientation = '1' + + for i, s, d, q in zip(seq_id, seq, dumb, qual): + # '@1', 'LH00444:84:227CNHLT4:7:1101:41955:2443/1' + # '@1', 'LH00444:84:227CNHLT4:7:1101:41955:2443/1 BX:Z:TATGACACATGCGGCCCT' # noqa + # '@baz/1 + + # NB: from 6d794a37-12cd-4f8e-95d6-72a4b8a1ec1c's only-adapter-filtered results: # noqa + # @A00953:244:HYHYWDSXY:3:1101:14082:3740 1:N:0:CCGTAAGA+TCTAACGC + + fname_encoded, sid = i.split(delimiter, 1) + + if fname_encoded not in openfps: + continue + + current_fp = openfps[fname_encoded] + + current_fp[orientation].write(f'{rec}{sid}') + current_fp[orientation].write(s) + current_fp[orientation].write(d) + current_fp[orientation].write(q) + + for d in openfps.values(): + for f in d.values(): + f.close() diff --git a/src/sequence_processing_pipeline/ConvertJob.py b/src/sequence_processing_pipeline/ConvertJob.py index 3a521d8c..6fb4967d 100644 --- a/src/sequence_processing_pipeline/ConvertJob.py +++ b/src/sequence_processing_pipeline/ConvertJob.py @@ -27,6 +27,7 @@ def __init__(self, run_dir, output_path, sample_sheet_path, queue_name, :param node_count: The number of nodes to request. :param nprocs: The maximum number of parallel processes to use. :param wall_time_limit: A hard time limit (in min) to bound processing. + :param pmem: Amount of memory to be used. :param bcl_tool_path: The path to either bcl2fastq or bcl-convert. :param modules_to_load: A list of Linux module names to load :param qiita_job_id: identify Torque jobs using qiita_job_id @@ -379,3 +380,120 @@ def _make_msg_base(col_name): # be provided in the destination path. dst_fp = join(self.output_path, dest_project, split(src_fp)[1]) copyfile(src_fp, dst_fp) + + +class ConvertPacBioBam2FastqJob(Job): + def __init__(self, run_dir, output_path, sample_sheet_path, queue_name, + node_count, nprocs, wall_time_limit, pmem, bam2fastq_path, + modules_to_load, qiita_job_id): + """ + ConvertPacBioBam2FastqJob provides a convenient way to run bam2fastq + on a directory PacBio bam files to generate fastq files. + :param run_dir: The 'run' directory that contains BCL files. + :param output_path: Path where all pipeline-generated files live. + :param sample_sheet_path: The path to a sample-sheet. + :param queue_name: The name of the Torque queue to use for processing. + :param node_count: The number of nodes to request. + :param nprocs: The maximum number of parallel processes to use. + :param wall_time_limit: A hard time limit (in min) to bound processing. + :param pmem: Amount of memory to be used. + :param bam2fastq_path: The path to either bcl2fastq or bcl-convert. + :param modules_to_load: A list of Linux module names to load + :param qiita_job_id: identify Torque jobs using qiita_job_id + """ + super().__init__(run_dir, + output_path, + 'ConvertJob', + [bam2fastq_path], + 1000, + modules_to_load=modules_to_load) + + # for metagenomics pipelines, sample_sheet_path will reflect a real + # sample_sheet file. For amplicon pipelines, sample_sheet_path will + # reference a dummy sample_sheet file. + self.sample_sheet_path = sample_sheet_path + self.queue_name = queue_name + self.node_count = node_count + self.nprocs = nprocs + self.wall_time_limit = wall_time_limit + self.pmem = pmem + self.bam2fastq_path = bam2fastq_path + self.qiita_job_id = qiita_job_id + self.job_script_path = join(self.output_path, f"{self.job_name}.sh") + self.suffix = 'fastq.gz' + self.fastq_paths = None + self.info = None + + self._file_check(self.sample_sheet_path) + + self._generate_job_script() + + def _generate_job_script(self): + """ + Generate a Torque job script for processing supplied root_directory. + :return: The path to the newly-created job-script. + """ + threads_per_job = 2 + parallel_jobs = int(self.nprocs/threads_per_job) + + lines = [] + lines.append("#!/bin/bash") + lines.append(f"#SBATCH --job-name {self.qiita_job_id}_{self.job_name}") + lines.append(f"#SBATCH -p {self.queue_name}") + lines.append(f'#SBATCH -N {self.node_count}') + lines.append(f'#SBATCH -n {self.nprocs}') + lines.append("#SBATCH --time %d" % self.wall_time_limit) + + # send an email to the list of users defined below when a job starts, + # terminates, or aborts. This is used to confirm that the package's + # own reporting mechanism is reporting correctly. + lines.append("#SBATCH --mail-type=ALL") + + # list of users to be contacted independently of this package's + # notification system, when a job starts, terminates, or gets aborted. + lines.append("#SBATCH --mail-user qiita.help@gmail.com") + + lines.append(f"#SBATCH --mem-per-cpu {self.pmem}") + + lines.append("set -x") + lines.append('set -e') + lines.append('date') + lines.append('hostname') + lines.append('source ~/.bash_profile') + lines.append('conda activate klp-2025-05') + lines.append(f'cd {self.output_path}') + + lines.append( + f'pacbio_generate_bam2fastq_commands ../sample_list.tsv ' + f'{self.root_dir} {self.output_path} {threads_per_job}' + f' | parallel -j {parallel_jobs}') + + with open(self.job_script_path, 'w') as f: + for line in lines: + # remove long spaces in some lines. + line = re.sub(r'\s+', ' ', line) + f.write(f"{line}\n") + + def run(self, callback=None): + """ + Run pacbio_generate_bam2fastq_commands, which runs bam2fastq + :param callback: optional function taking two parameters (id, status) + that is called when a running process's status is + changed. + :return: + """ + try: + job_info = self.submit_job(self.job_script_path, + exec_from=self.log_path, + callback=callback) + except JobFailedError as e: + # When a job has failed, parse the logs generated by this specific + # job to return a more descriptive message to the user. + info = self.parse_logs() + # prepend just the message component of the Error. + info.insert(0, str(e)) + raise JobFailedError('\n'.join(info)) + + self.mark_job_completed() + + logging.info(f'Successful job: {job_info}') diff --git a/src/sequence_processing_pipeline/FastQCJob.py b/src/sequence_processing_pipeline/FastQCJob.py index 9d34cb04..27a1c7c4 100644 --- a/src/sequence_processing_pipeline/FastQCJob.py +++ b/src/sequence_processing_pipeline/FastQCJob.py @@ -2,6 +2,7 @@ from jinja2 import Environment from json import dumps import logging +from itertools import zip_longest from os import listdir, makedirs from os.path import join, basename from re import sub @@ -156,15 +157,16 @@ def _scan_fastq_files(self, is_raw_input=False): # gather a list of names from the outputs of previous jobs. project_names = [] - for proj_name, fltr_type, fastq_fp, fwd_files, rev_files in projects: + for proj_name, fltr_type, _, fwd_files, rev_files in projects: project_names.append(proj_name) p_path = partial(join, self.output_path, 'fastqc', proj_name) output_dir = p_path('bclconvert' if is_raw_input else fltr_type) makedirs(output_dir, exist_ok=True) - for some_fwd_file, some_rev_file in zip(fwd_files, rev_files): - fastqc_results.append((some_fwd_file, some_rev_file, - output_dir)) + for sff, srf in zip_longest(fwd_files, rev_files): + if srf is None: + srf = '' + fastqc_results.append((sff, srf, output_dir)) # remove duplicates project_names = list(set(project_names)) diff --git a/src/sequence_processing_pipeline/NuQCJob.py b/src/sequence_processing_pipeline/NuQCJob.py index 7273473e..8e1a31f6 100644 --- a/src/sequence_processing_pipeline/NuQCJob.py +++ b/src/sequence_processing_pipeline/NuQCJob.py @@ -24,7 +24,8 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, samtools_path, modules_to_load, qiita_job_id, max_array_length, known_adapters_path, movi_path, gres_value, pmls_path, additional_fastq_tags, bucket_size=8, - length_limit=100, cores_per_task=4, files_regex='SPP'): + length_limit=100, cores_per_task=4, files_regex='SPP', + read_length='short'): """ Submit a slurm job where the contents of fastq_root_dir are processed using fastp, minimap2, and samtools. Human-genome sequences will be @@ -52,6 +53,7 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, :param additional_fastq_tags: A list of fastq tags to preserve during filtering. :param files_regex: the FILES_REGEX to use for parsing files + :param read_length: string defining the read length: long/short. """ super().__init__(fastq_root_dir, output_path, @@ -59,6 +61,7 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, [fastp_path, minimap2_path, samtools_path], max_array_length, modules_to_load=modules_to_load) + self.read_length = read_length self.sample_sheet_path = sample_sheet_path self._file_check(self.sample_sheet_path) metadata = self._process_sample_sheet() @@ -118,10 +121,12 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, def _validate_project_data(self): # Validate project settings in [Bioinformatics] for project in self.project_data: - if project['ForwardAdapter'] == 'NA': + if 'ForwardAdapter' not in project or \ + project['ForwardAdapter'] == 'NA': project['ForwardAdapter'] = None - if project['ReverseAdapter'] == 'NA': + if 'ReverseAdapter' not in project or \ + project['ReverseAdapter'] == 'NA': project['ReverseAdapter'] = None if project['ForwardAdapter'] is None: @@ -151,15 +156,22 @@ def _filter_empty_fastq_files(self, filtered_directory, files = glob(join(filtered_directory, f'*.{self.suffix}')) - for r1, r2 in iter_paired_files(files): - full_path = join(filtered_directory, r1) - full_path_reverse = join(filtered_directory, r2) - if stat(full_path).st_size <= minimum_bytes or stat( - full_path_reverse).st_size <= minimum_bytes: - logging.debug(f'moving {full_path} and {full_path_reverse}' - f' to empty list.') - empty_list.append(full_path) - empty_list.append(full_path_reverse) + if self.read_length == 'long': + for r1 in files: + full_path = join(filtered_directory, r1) + if stat(full_path).st_size <= minimum_bytes: + logging.debug(f'moving {full_path} to empty list.') + empty_list.append(full_path) + else: + for r1, r2 in iter_paired_files(files): + full_path = join(filtered_directory, r1) + full_path_reverse = join(filtered_directory, r2) + if stat(full_path).st_size <= minimum_bytes or stat( + full_path_reverse).st_size <= minimum_bytes: + logging.debug(f'moving {full_path} and {full_path_reverse}' + f' to empty list.') + empty_list.append(full_path) + empty_list.append(full_path_reverse) if empty_list: logging.debug(f'making directory {empty_files_directory}') @@ -234,9 +246,9 @@ def run(self, callback=None): batch_count = 0 max_size = 0 else: - batch_count, max_size = split_similar_size_bins(self.root_dir, - self.bucket_size, - batch_location) + batch_count, max_size = split_similar_size_bins( + self.root_dir, self.bucket_size, batch_location, + self.read_length == 'long') job_script_path = self._generate_job_script(max_size) @@ -364,7 +376,7 @@ def _process_sample_sheet(self): raise PipelineError(s) header = sheet.Header - chemistry = header['chemistry'] + chemistry = header.get('chemistry', '') if header['Assay'] not in Pipeline.assay_types: s = "Assay value '%s' is not recognized." % header['Assay'] @@ -372,7 +384,8 @@ def _process_sample_sheet(self): sample_ids = [] for sample in sheet.samples: - sample_ids.append((sample['Sample_ID'], sample['Sample_Project'])) + sample_ids.append( + (sample['Sample_ID'], sample['Sample_Project'])) bioinformatics = sheet.Bioinformatics @@ -412,6 +425,17 @@ def _generate_mmi_filter_cmds(self, working_dir): tags = " -T %s" % ','.join(self.additional_fastq_tags) t_switch = " -y" + if self.read_length == 'short': + minimap2_prefix = f'minimap2 -2 -ax sr{t_switch}' + minimap2_subfix = '-a' + samtools_params = '-f 12 -F 256' + elif self.read_length == 'long': + minimap2_prefix = 'minimap2 -2 -ax map-hifi' + minimap2_subfix = '--no-pairing' + samtools_params = '-f 4 -F 256' + else: + raise ValueError(f'minimap2 prefix not set for {self.read_length}') + for count, mmi_db_path in enumerate(self.mmi_file_paths): if count == 0: # prime initial state with unfiltered file and create first of @@ -427,9 +451,10 @@ def _generate_mmi_filter_cmds(self, working_dir): input = tmp_file1 output = tmp_file2 - cmds.append(f"minimap2 -2 -ax sr{t_switch} -t {cores_to_allocate} " - f"{mmi_db_path} {input} -a | samtools fastq -@ " - f"{cores_to_allocate} -f 12 -F 256{tags} > " + cmds.append(f"{minimap2_prefix} -t {cores_to_allocate} " + f"{mmi_db_path} {input} {minimap2_subfix} | " + "samtools fastq -@ " + f"{cores_to_allocate} {samtools_params}{tags} > " f"{output}") # rename the latest tmp file to the final output filename. @@ -448,7 +473,6 @@ def _generate_job_script(self, max_bucket_size): return None job_script_path = join(self.output_path, 'process_all_fastq_files.sh') - template = self.jinja_env.get_template("nuqc_job.sh") job_name = f'{self.qiita_job_id}_{self.job_name}' @@ -473,6 +497,16 @@ def _generate_job_script(self, max_bucket_size): # files can be created. (${jobd}) mmi_filter_cmds = self._generate_mmi_filter_cmds("${jobd}") + if self.read_length == 'short': + pmls_extra_parameters = '' + template = self.jinja_env.get_template("nuqc_job.sh") + elif self.read_length == 'long': + pmls_extra_parameters = '"max" 21' + template = self.jinja_env.get_template("nuqc_job_single.sh") + else: + raise ValueError( + f'pmls_extra_parameters not set for {self.read_length}') + with open(job_script_path, mode="w", encoding="utf-8") as f: # the job resources should come from a configuration file @@ -480,31 +514,33 @@ def _generate_job_script(self, max_bucket_size): # processing begins. mtl = ' '.join(self.modules_to_load) - f.write(template.render(job_name=job_name, - queue_name=self.queue_name, - # should be 4 * 24 * 60 = 4 days - wall_time_limit=self.wall_time_limit, - mem_in_gb=self.jmem, - # Note NuQCJob now maps node_count to - # SLURM -N parameter to act like other - # Job classes. - # self.node_count should be 1 - node_count=self.node_count, - # cores-per-task (-c) should be 4 - cores_per_task=self.cores_per_task, - knwn_adpt_path=self.known_adapters_path, - output_path=self.output_path, - html_path=html_path, - json_path=json_path, - demux_path=demux_path, - temp_dir=self.temp_dir, - splitter_binary=splitter_binary, - modules_to_load=mtl, - length_limit=self.length_limit, - gres_value=self.gres_value, - movi_path=self.movi_path, - mmi_filter_cmds=mmi_filter_cmds, - pmls_path=self.pmls_path)) + f.write(template.render( + job_name=job_name, + queue_name=self.queue_name, + # should be 4 * 24 * 60 = 4 days + wall_time_limit=self.wall_time_limit, + mem_in_gb=self.jmem, + # Note NuQCJob now maps node_count to + # SLURM -N parameter to act like other + # Job classes. + # self.node_count should be 1 + node_count=self.node_count, + # cores-per-task (-c) should be 4 + cores_per_task=self.cores_per_task, + knwn_adpt_path=self.known_adapters_path, + output_path=self.output_path, + html_path=html_path, + json_path=json_path, + demux_path=demux_path, + temp_dir=self.temp_dir, + splitter_binary=splitter_binary, + modules_to_load=mtl, + length_limit=self.length_limit, + gres_value=self.gres_value, + movi_path=self.movi_path, + mmi_filter_cmds=mmi_filter_cmds, + pmls_extra_parameters=pmls_extra_parameters, + pmls_path=self.pmls_path)) return job_script_path diff --git a/src/sequence_processing_pipeline/Pipeline.py b/src/sequence_processing_pipeline/Pipeline.py index 4fa5c6b2..d0060d6b 100644 --- a/src/sequence_processing_pipeline/Pipeline.py +++ b/src/sequence_processing_pipeline/Pipeline.py @@ -3,6 +3,7 @@ from json.decoder import JSONDecodeError from os import makedirs, listdir, walk from os.path import join, exists, isdir, basename +from glob import glob from metapool import (load_sample_sheet, AmpliconSampleSheet, is_blank, parse_project_name, SAMPLE_NAME_KEY, QIITA_ID_KEY, PROJECT_SHORT_NAME_KEY, PROJECT_FULL_NAME_KEY, @@ -30,14 +31,17 @@ class InstrumentUtils(): @staticmethod def _get_instrument_id(run_directory): run_info = join(run_directory, 'RunInfo.xml') + instrument_text = 'Run/Instrument' + # if RunInfo.xml doesn't exist, this might be a PacBio + # folder, let's check for it if not exists(run_info): - raise ValueError(f"'{run_info}' doesn't exist") + run = InstrumentUtils._get_pacbio_run_str(run_info, run_directory) + text = run.attrib['TimeStampedName'] + else: + text = ET.parse(run_info).find(instrument_text).text - with open(run_info) as f: - # Instrument element should appear in all valid RunInfo.xml - # files. - return ET.fromstring(f.read()).find('Run/Instrument').text + return text @staticmethod def get_instrument_type(run_directory): @@ -45,17 +49,32 @@ def get_instrument_type(run_directory): return get_model_by_instrument_id( instrument_id, model_key=PROFILE_NAME_KEY) + @staticmethod + def _get_pacbio_run_str(run_info, run_directory): + pacbio_metadata_fps = glob( + f'{run_directory}/*/metadata/*.metadata.xml') + if not pacbio_metadata_fps: + raise ValueError(f"'{run_info}' doesn't exist") + + run_info = pacbio_metadata_fps[0] + tree = ET.parse(run_info) + mtag = tree.getroot().tag.split('}')[0] + '}' + instrument_text = ( + f'{mtag}ExperimentContainer/{mtag}Runs/{mtag}Run') + run = ET.parse(run_info).find(instrument_text) + return run + @staticmethod def _get_date(run_directory): run_info = join(run_directory, 'RunInfo.xml') + # if RunInfo.xml doesn't exist, this might be a PacBio + # folder, let's check for it if not exists(run_info): - raise ValueError(f"'{run_info}' doesn't exist") - - with open(run_info) as f: - # Date element should appear in all valid RunInfo.xml - # files. - date_string = ET.fromstring(f.read()).find('Run/Date').text + run = InstrumentUtils._get_pacbio_run_str(run_info, run_directory) + date_string = run.attrib['CreatedAt'].split('T')[0] + else: + date_string = ET.parse(run_info).find('Run/Date').text # date is recorded in RunInfo.xml in various formats. Iterate # through all known formats until the date is properly parsed. @@ -64,7 +83,7 @@ def _get_date(run_directory): # UTC time w/out confirming the machines were actually set for # and/or reporting UTC time. formats = ["%y%m%d", "%Y-%m-%dT%H:%M:%s", "%Y-%m-%dT%H:%M:%SZ", - "%m/%d/%Y %I:%M:%S %p"] + "%m/%d/%Y %I:%M:%S %p", "%Y-%m-%d",] for format in formats: try: @@ -244,22 +263,6 @@ def __init__(self, configuration_file_path, run_id, input_file_path, raise PipelineError(f"A run-dir for '{self.run_id}' could not be " "found") - # required files for successful operation - # both RTAComplete.txt and RunInfo.xml should reside in the root of - # the run directory. - required_files = ['RTAComplete.txt', 'RunInfo.xml'] - for some_file in required_files: - if not exists(join(self.run_dir, some_file)): - raise PipelineError(f"required file '{some_file}' is not " - f"present in {self.run_dir}.") - - # verify that RunInfo.xml file is readable. - try: - fp = open(join(self.run_dir, 'RunInfo.xml')) - fp.close() - except PermissionError: - raise PipelineError('RunInfo.xml is present, but not readable') - self.input_file_path = input_file_path if pipeline_type == Pipeline.AMPLICON_PTYPE: @@ -680,23 +683,13 @@ def generate_sample_info_files(self, dereplicated_input_file_paths): controls_in_proj_df['description'] = controls_in_proj_df[TEMP_KEY] controls_in_proj_df.drop(columns=[TEMP_KEY], inplace=True) controls_in_proj_df['collection_timestamp'] = \ - self.get_date_from_run_id() + InstrumentUtils._get_date(self.run_dir) controls_in_proj_df = controls_in_proj_df[Pipeline.sif_header] controls_in_proj_df.to_csv(curr_fp, sep='\t', index=False) return paths - def get_date_from_run_id(self): - # assume all run_ids begin with coded datestamp: - # 210518_... - # allow exception if substrings cannot convert to int - # or if array indexes are out of bounds. - year = int(self.run_id[0:2]) + 2000 - month = int(self.run_id[2:4]) - day = int(self.run_id[4:6]) - return f'{year}-{month}-{day}' - def get_sample_ids(self): ''' Returns list of sample-ids sourced from sample-sheet or pre-prep file diff --git a/src/sequence_processing_pipeline/templates/nuqc_job.sh b/src/sequence_processing_pipeline/templates/nuqc_job.sh index 7ada81f4..962bba24 100644 --- a/src/sequence_processing_pipeline/templates/nuqc_job.sh +++ b/src/sequence_processing_pipeline/templates/nuqc_job.sh @@ -125,10 +125,10 @@ function mux-runner () { --index /scratch/movi_hg38_chm13_hprc94 \ --read ${seq_reads_filter_alignment} \ --stdout | gzip > ${jobd}/seqs.movi.txt.gz - - python {{pmls_path}} <(zcat ${jobd}/seqs.movi.txt.gz) | \ + + python {{pmls_path}} <(zcat ${jobd}/seqs.movi.txt.gz) {{pmls_extra_parameters}} | \ seqtk subseq ${seq_reads_filter_alignment} - > ${jobd}/seqs.final.fastq - + {{splitter_binary}} ${jobd}/seqs.final.fastq \ ${jobd}/reads.r1.fastq ${delimiter} ${r1_tag} & {{splitter_binary}} ${jobd}/seqs.final.fastq \ diff --git a/src/sequence_processing_pipeline/templates/nuqc_job_single.sh b/src/sequence_processing_pipeline/templates/nuqc_job_single.sh new file mode 100644 index 00000000..d06ac833 --- /dev/null +++ b/src/sequence_processing_pipeline/templates/nuqc_job_single.sh @@ -0,0 +1,167 @@ +#!/bin/bash -l +#SBATCH -J {{job_name}} +#SBATCH -p {{queue_name}} +### wall-time-limit in minutes +#SBATCH --time {{wall_time_limit}} +#SBATCH --mem {{mem_in_gb}}G +#SBATCH -N {{node_count}} +### Note cores_per_task maps to fastp & minimap2 thread counts +### as well as sbatch -c. demux threads remains fixed at 1. +### Note -c set to 4 and thread counts set to 7 during testing. +#SBATCH -c {{cores_per_task}} +### Commented out for now, but there is a possibility it will be needed +### in the future. +###SBATCH --gres=node_jobs:{{gres_value}} +#SBATCH --constraint="amd" + + +echo "---------------" +echo "Run details:" +echo "$SLURM_JOB_NAME $SLURM_JOB_ID $SLURMD_NODENAME $SLURM_ARRAY_TASK_ID" +echo "---------------" + +if [[ -z "${SLURM_ARRAY_TASK_ID}" ]]; then + echo "Not operating within an array" + exit 1 +fi +if [[ -z ${PREFIX} ]]; then + echo "PREFIX is not set" + exit 1 +fi +if [[ -z ${OUTPUT} ]]; then + echo "OUTPUT is not set" + exit 1 +fi + +conda activate klp-2025-05 +module load {{modules_to_load}} + +set -x +set -e +set -o pipefail + +export FILES=$(printf "%s-%d" ${PREFIX} ${SLURM_ARRAY_TASK_ID}) +if [[ ! -f ${FILES} ]]; then + logger ${FILES} not found + exit 1 +fi +# set a temp directory, make a new unique one under it and +# make sure we clean up as we're dumping to shm +# DO NOT do this casually. Only do a clean up like this if +# you know for sure TMPDIR is what you want. + +WKDIR=${OUTPUT}/ +TMPDIR=${OUTPUT} +export TMPDIR=${TMPDIR} +export TMPDIR=$(mktemp -d) +echo $TMPDIR + +mkdir -p ${WKDIR}/fastp_reports_dir/html +mkdir -p ${WKDIR}/fastp_reports_dir/json + +export ADAPTER_ONLY_OUTPUT=${OUTPUT}/only-adapter-filtered +mkdir -p ${ADAPTER_ONLY_OUTPUT} + +function cleanup { + echo "Removing $TMPDIR" + rm -fr $TMPDIR + unset TMPDIR +} +trap cleanup EXIT + +export delimiter=::MUX:: +export r1_tag=/1 +function mux-runner () { + n=$(wc -l ${FILES} | cut -f 1 -d" ") + + jobd=${TMPDIR} + id_map=${jobd}/id_map + seqs_reads=${jobd}/seqs.interleaved.fastq + seq_reads_filter_alignment=${jobd}/seqs.interleaved.filter_alignment.fastq + + for i in $(seq 1 ${n}) + do + line=$(head -n ${i} ${FILES} | tail -n 1) + r1=$(echo ${line} | cut -f 1 -d" ") + base=$(echo ${line} | cut -f 2 -d" ") + r1_name=$(basename ${r1} .fastq.gz) + r_adapter_only=${ADAPTER_ONLY_OUTPUT}/${r1_name}.interleave.fastq.gz + + s_name=$(basename "${r1}" | sed -r 's/\.fastq\.gz//') + html_name=$(echo "$s_name.html") + json_name=$(echo "$s_name.json") + + echo -e "${i}\t${r1_name}\t${base}" >> ${id_map} + + # movi, in the current version, works on the interleaved version of the + # fwd/rev reads so we are gonna take advantage fastp default output + # to minimize steps. Additionally, movi expects the input to not be + # gz, so we are not going to compress seqs_r1 + + fastp \ + -l {{length_limit}} \ + -i ${r1} \ + -w {{cores_per_task}} \ + --adapter_fasta {{knwn_adpt_path}} \ + --html {{html_path}}/${html_name} \ + --json {{json_path}}/${json_name} \ + --stdout | gzip > ${r_adapter_only} + + # multiplex and write adapter filtered data all at once + zcat ${r_adapter_only} | \ + sed -r "1~4s/^@(.*)/@${i}${delimiter}\1/" \ + >> ${seqs_reads} + done + + # minimap/samtools pair commands are now generated in NuQCJob._generate_mmi_filter_cmds() + # and passed to this template. + {{mmi_filter_cmds}} + + {{movi_path}} query \ + --index /scratch/movi_hg38_chm13_hprc94 \ + --read ${seq_reads_filter_alignment} \ + --stdout | gzip > ${jobd}/seqs.movi.txt.gz + + python {{pmls_path}} <(zcat ${jobd}/seqs.movi.txt.gz) {{pmls_extra_parameters}} | \ + seqtk subseq ${seq_reads_filter_alignment} - > ${jobd}/seqs.final.fastq + + # keep seqs.movi.txt and migrate it to NuQCJob directory. + mv ${jobd}/seqs.movi.txt.gz {{output_path}}/logs/seqs.movi.${SLURM_ARRAY_TASK_ID}.txt.gz +} +export -f mux-runner + + +function demux-runner () { + n_demux_jobs=${SLURM_CPUS_PER_TASK} + jobd=${TMPDIR} + id_map=${jobd}/id_map + seqs_r1=${jobd}/seqs.final.fastq + + id_map=${jobd}/id_map + if [[ ! -f ${id_map} ]]; then + echo "No samples..." + return + fi + + for idx in $(seq 0 ${n_demux_jobs}) + do + demux_just_fwd \ + --id-map ${id_map} \ + --infile <(cat ${seqs_r1}) \ + --output ${OUTPUT} \ + --task ${idx} \ + --maxtask ${n_demux_jobs} & + done + wait +} +export -f demux-runner + +mux-runner + +mkdir -p ${OUTPUT} + +echo "$(date) :: demux start" +demux-runner +echo "$(date) :: demux stop" + +touch ${OUTPUT}/${SLURM_JOB_NAME}.${SLURM_ARRAY_TASK_ID}.completed diff --git a/src/sequence_processing_pipeline/util.py b/src/sequence_processing_pipeline/util.py index f38b38ee..bacf8306 100644 --- a/src/sequence_processing_pipeline/util.py +++ b/src/sequence_processing_pipeline/util.py @@ -4,8 +4,8 @@ PAIR_UNDERSCORE = (REC(r'_R1_'), '_R1_', '_R2_') PAIR_DOT = (REC(r'\.R1\.'), '.R1.', '.R2.') -SIMPLE_PAIR_UNDERSCAOTE = (REC(r'_R1'), '_R1', '_R2') -PAIR_TESTS = (PAIR_UNDERSCORE, PAIR_DOT, SIMPLE_PAIR_UNDERSCAOTE) +SIMPLE_PAIR_UNDERSCORE = (REC(r'_R1'), '_R1', '_R2') +PAIR_TESTS = (PAIR_UNDERSCORE, PAIR_DOT, SIMPLE_PAIR_UNDERSCORE) FILES_REGEX = { 'SPP': { 'fastq': REC(r'^(.*)_S\d{1,4}_L\d{3}_R\d_\d{3}\.fastq\.gz$'), diff --git a/tests/data/configuration_profiles/pacbio_metagenomic.json b/tests/data/configuration_profiles/pacbio_metagenomic.json new file mode 100644 index 00000000..6cd9fc95 --- /dev/null +++ b/tests/data/configuration_profiles/pacbio_metagenomic.json @@ -0,0 +1,64 @@ +{ + "profile": { + "instrument_type": "Revio", + "assay_type": "Metagenomic", + "configuration": { + "pacbio_convert": { + "nodes": 1, + "nprocs": 28, + "queue": "qiita", + "wallclock_time_in_minutes": 216, + "modules_to_load": [], + "executable_path": "bam2fastq", + "per_process_memory_limit": "1gb" + }, + "nu-qc": { + "nodes": 1, + "cpus_per_task": 8, + "queue": "qiita", + "wallclock_time_in_minutes": 2440, + "minimap2_databases": [ + "/databases/minimap2/db_1.mmi", + "/databases/minimap2/db_2.mmi" + ], + "modules_to_load": [ + "gcc_9.3.0", + "fastp_0.20.1", + "samtools_1.12", + "minimap2_2.28" + ], + "fastp_executable_path": "fastp", + "minimap2_executable_path": "minimap2", + "samtools_executable_path": "samtools", + "job_total_memory_limit": "110", + "job_max_array_length": 1000, + "known_adapters_path": "fastp_known_adapters_formatted.fna", + "bucket_size": 18, + "length_limit": 100, + "cores_per_task": 16, + "movi_executable_path": "/home/user/user_dir/Movi/build/movi-default", + "gres_value": 4, + "pmls_path": "/home/user/user_dir/human_host_filtration/scripts/qiita_filter_pmls.py", + "additional_fastq_tags": [] + }, + "seqpro": { + "seqpro_path": "seqpro", + "modules_to_load": [] + }, + "fastqc": { + "nodes": 1, + "nprocs": 16, + "queue": "qiita", + "nthreads": 16, + "wallclock_time_in_minutes": 60, + "modules_to_load": ["fastqc_0.12.1"], + "fastqc_executable_path": "fastqc", + "multiqc_executable_path": "multiqc", + "multiqc_config_file_path": "sequence_processing_pipeline/multiqc-bclconvert-config.yaml", + "job_total_memory_limit": "20gb", + "job_pool_size": 30, + "job_max_array_length": 1000 + } + } + } +} diff --git a/tests/data/r11111_20250101_111111/1_A01/metadata/m11111_20250101_111111_s4.metadata.xml b/tests/data/r11111_20250101_111111/1_A01/metadata/m11111_20250101_111111_s4.metadata.xml new file mode 100644 index 00000000..8bc3a16c --- /dev/null +++ b/tests/data/r11111_20250101_111111/1_A01/metadata/m11111_20250101_111111_s4.metadata.xml @@ -0,0 +1,11 @@ + + + + Default + 2025-01-01 + + + + + + diff --git a/tests/data/sample-sheets/metagenomic/pacbio/good_pacbio_metagv10.csv b/tests/data/sample-sheets/metagenomic/pacbio/good_pacbio_metagv10.csv new file mode 100644 index 00000000..c18e8f33 --- /dev/null +++ b/tests/data/sample-sheets/metagenomic/pacbio/good_pacbio_metagv10.csv @@ -0,0 +1,27 @@ +[Header],,,,,,, +SheetType,pacbio_metag,,,,,, +SheetVersion,10,,,,,, +Investigator Name,Knight,,,,,, +Experiment Name,RKL_experiment,,,,,, +Date,2025-08-29,,,,,, +Assay,Metagenomic,,,,,, +Description,,,,,,, +,,,,,,, +[Data],,,,,,, +Sample_ID,Sample_Name,Sample_Plate,Sample_Well,barcode_id,Sample_Project,Well_description,Lane +3A,3A,sample_plate_1,A1,bc3011,MyProject_6123,sample_plate_1.3A.A1,1 +4A,4A,sample_plate_1,A2,bc0112,MyProject_6123,sample_plate_1.4A.A2,1 +5B,5B,sample_plate_1,A3,bc9992,MyProject_6123,sample_plate_1.5B.A3,1 +,,,,,,, +[Bioinformatics],,,,,,, +Sample_Project,QiitaID,HumanFiltering,library_construction_protocol,experiment_design_description,contains_replicates,, +MyProject_6123,6123,False,some protocol,some description,False,, +,,,,,,, +[Contact],,,,,,, +Sample_Project,Email,,,,,, +MyProject_6123,foo@bar.org,,,,,, +,,,,,,, +[SampleContext],,,,,,, +sample_name,sample_type,primary_qiita_study,secondary_qiita_studies,,,, +5B,control blank,6123,,,,, +,,,,,,, diff --git a/tests/data/sample_run_directories/r11111_20250101_111111/1_A01/metadata/m11111_20250101_111111_s4.metadata.xml b/tests/data/sample_run_directories/r11111_20250101_111111/1_A01/metadata/m11111_20250101_111111_s4.metadata.xml new file mode 100644 index 00000000..8bc3a16c --- /dev/null +++ b/tests/data/sample_run_directories/r11111_20250101_111111/1_A01/metadata/m11111_20250101_111111_s4.metadata.xml @@ -0,0 +1,11 @@ + + + + Default + 2025-01-01 + + + + + + diff --git a/tests/test_PacBioWorflow.py b/tests/test_PacBioWorflow.py new file mode 100644 index 00000000..bc1f1e46 --- /dev/null +++ b/tests/test_PacBioWorflow.py @@ -0,0 +1,136 @@ +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The Qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- +from qp_klp.WorkflowFactory import WorkflowFactory +from unittest import main +from os import makedirs +from os.path import dirname, abspath, join, exists +from shutil import copyfile +from shutil import rmtree +from pathlib import Path +import pandas as pd +from metapool.sample_sheet import PROTOCOL_NAME_PACBIO_SMRT +from qp_klp.Assays import ASSAY_NAME_METAGENOMIC +from qiita_client.testing import PluginTestCase + + +class WorkflowFactoryTests(PluginTestCase): + def setUp(self): + self.base_dir = dirname(abspath(__file__)) + self.output_dir = join(self.base_dir, 'test_output') + self.remove_these = [self.output_dir] + + self.gz_source = f'{self.base_dir}/data/dummy.fastq.gz' + + def tearDown(self): + for fp in self.remove_these: + if exists(fp): + rmtree(fp) + + def _create_directory(self, fp): + # just making sure that we always start with a clean folder: + if exists(fp): + rmtree(fp) + makedirs(fp) + self.remove_these.append(fp) + + def _inject_data(self, wf): + '''This is a helper method for testing that all the steps are run + when testing wf.execute_pipeline() + ''' + samples = wf.pipeline.sample_sheet.samples + + convert_dir = f'{self.output_dir}/ConvertJob' + reports_dir = f'{self.output_dir}/ConvertJob/Reports' + makedirs(convert_dir, exist_ok=True) + makedirs(reports_dir, exist_ok=True) + Path(f'{convert_dir}/job_completed').touch() + nuqc_dir = f'{self.output_dir}/NuQCJob' + fastqc_dir = f'{self.output_dir}/FastQCJob/logs/' + multiqc_dir = f'{self.output_dir}/MultiQCJob/logs/' + genprep_dir = (f'{self.output_dir}/GenPrepFileJob/' + 'r11111_20250101_111111/') + makedirs(nuqc_dir, exist_ok=True) + makedirs(fastqc_dir, exist_ok=True) + makedirs(multiqc_dir, exist_ok=True) + makedirs(genprep_dir, exist_ok=True) + # now let's create the required project folders + for project in wf.pipeline.get_project_info(): + sp = project['project_name'] + makedirs(f'{convert_dir}/{sp}', exist_ok=True) + makedirs(f'{nuqc_dir}/filtered_sequences/{sp}', exist_ok=True) + makedirs(f'{genprep_dir}/{sp}/filtered_sequences/', + exist_ok=True) + + # then loop over samples and stage all fastq.gz files + dstats = [] + for sample in samples: + sn = sample['Sample_ID'] + sp = sample["Sample_Project"] + dstats.append({'SampleID': sn, '# Reads': 2}) + dname = f'{convert_dir}/{sp}' + copyfile(self.gz_source, + f'{dname}/{sn}_S000_L001_R1_001.fastq.gz') + with open(f'{dname}/{sn}_S000_L001_R1_001.counts.txt', 'w') as f: + f.write("2") + + # NuQCJob + dname = f'{nuqc_dir}/filtered_sequences/{sp}' + copyfile(self.gz_source, + f'{dname}/{sn}_S000_L001_R1_001.trimmed.fastq.gz') + + # GenPrepFileJob + gprep_base = f'{genprep_dir}/{sp}/filtered_sequences/{sn}' + copyfile(self.gz_source, + f'{gprep_base}_S000_L001_R1_001.trimmed.fastq.gz') + + pd.DataFrame(dstats).set_index('SampleID').to_csv( + f'{reports_dir}/Demultiplex_Stats.csv') + + # generating the "*.completed" files + for i in range(len(samples)*3): + Path(f'{fastqc_dir}/FastQCJob_{i}.completed').touch() + Path(f'{multiqc_dir}/MultiQCJob_{i}.completed').touch() + + def test_pacbio_metagenomic_workflow_creation(self): + kwargs = {"uif_path": "tests/data/sample-sheets/metagenomic/" + "pacbio/good_pacbio_metagv10.csv", + "qclient": self.qclient, + "lane_number": "1", + "config_fp": "tests/configuration.json", + "run_identifier": "r11111_20250101_111111", + "output_dir": self.output_dir, + "job_id": "78901", + "is_restart": False + } + + self._create_directory(kwargs['output_dir']) + + wf = WorkflowFactory.generate_workflow(**kwargs) + + # confirm that the proper type of workflow was generated. + self.assertEqual(wf.protocol_type, PROTOCOL_NAME_PACBIO_SMRT) + self.assertEqual(wf.assay_type, ASSAY_NAME_METAGENOMIC) + + self._inject_data(wf) + + # we can add some checks/tests of the initial scripts (mainly Convert) + # but not doing now as is not required + + # Metagenomic is a valid data type in the default qiita test + # database but job-id: 78901 doesn't exist; however, if we get + # to here, it means that all steps have run to completion + # and the system is trying to create the preps. + # Note: Qiita job_id's are UUID in the database and this tests + # uses 78901 as the job_id so the db complains about the format + with self.assertRaisesRegex(RuntimeError, 'invalid input ' + 'syntax for type uuid: "78901"'): + wf.execute_pipeline() + + +if __name__ == '__main__': + main() diff --git a/tests/test_Pipeline.py b/tests/test_Pipeline.py index f1f69e2f..d27d73f2 100644 --- a/tests/test_Pipeline.py +++ b/tests/test_Pipeline.py @@ -207,43 +207,6 @@ def test_get_orig_names_from_sheet_with_replicates(self): self.assertEqual(set(obs), exp) - def test_required_file_checks(self): - # begin this test by deleting the RunInfo.txt file and verifying that - # Pipeline object will raise an Error. - self.delete_runinfo_file() - - with self.assertRaisesRegex(PipelineError, "required file 'RunInfo.xml" - "' is not present."): - Pipeline(self.good_config_file, self.good_run_id, - self.good_sample_sheet_path, - self.output_file_path, - self.qiita_id, Pipeline.METAGENOMIC_PTYPE) - - # delete RTAComplete.txt and recreate RunInfo.txt file to verify that - # an Error is raised when only RTAComplete.txt is missing. - self.delete_rtacomplete_file() - self.create_runinfo_file() - - with self.assertRaisesRegex(PipelineError, "required file 'RTAComplete" - ".txt' is not present."): - Pipeline(self.good_config_file, self.good_run_id, - self.good_sample_sheet_path, - self.output_file_path, - self.qiita_id, Pipeline.METAGENOMIC_PTYPE) - - # make RunInfo.xml file unreadable and verify that Pipeline object - # raises the expected Error. - self.create_rtacomplete_file() - self.make_runinfo_file_unreadable() - - with self.assertRaisesRegex(PipelineError, "RunInfo.xml is present, bu" - "t not readable"): - Pipeline(self.good_config_file, self.good_run_id, - self.good_sample_sheet_path, - self.output_file_path, - self.qiita_id, Pipeline.METAGENOMIC_PTYPE) - self.make_runinfo_file_readable() - def test_creation(self): # Pipeline should assert due to config_file with self.assertRaises(PipelineError) as e: @@ -437,7 +400,7 @@ def test_generate_sample_information_files(self): exp_first_lines = { f'{self.good_run_id}_StudyA_13059_blanks.tsv': - 'BLANK1.1A\t2021-10-21\t193\t' + 'BLANK1.1A\t2017-05-23\t193\t' 'Control\tNegative\tSterile w' 'ater blank\tSterile water blank\turban biome\tres' 'earch facility\tsterile wate' @@ -448,7 +411,7 @@ def test_generate_sample_information_files(self): 'StudyA\tTRUE\t' 'UCSD\tFALSE', f'{self.good_run_id}_StudyB_11661_blanks.tsv': - 'BLANK.40.12G\t2021-10-21\t193\tControl' + 'BLANK.40.12G\t2017-05-23\t193\tControl' '\tNegative\tSterile water blank\tSterile water blank\turban ' 'biome\tresearch facility\tsterile water' '\tmisc environment\tUSA:CA:San Diego\tB' @@ -456,7 +419,7 @@ def test_generate_sample_information_files(self): 'nk\tmetagenome\t256318\tBLANK.40.12G\t' 'StudyB\tTRUE\tUCSD\tFALSE', f'{self.good_run_id}_StudyC_6123_blanks.tsv': - 'BLANK.41.12G\t2021-10-21\t193\tControl' + 'BLANK.41.12G\t2017-05-23\t193\tControl' '\tNegative\tSterile water blank\tSterile water blank\turban' ' biome\tresearch facility\tsterile wat' 'er\tmisc environment\tUSA:CA:San Diego' @@ -467,7 +430,7 @@ def test_generate_sample_information_files(self): exp_last_lines = { f'{self.good_run_id}_StudyA_13059_blanks.tsv': - 'BLANK4.4H\t2021-10-21\t193\t' + 'BLANK4.4H\t2017-05-23\t193\t' 'Control\tNegative\tSterile w' 'ater blank\tSterile water blank\turban biome\tres' 'earch facility\tsterile wate' @@ -478,7 +441,7 @@ def test_generate_sample_information_files(self): 'StudyA\tTRUE\t' 'UCSD\tFALSE', f'{self.good_run_id}_StudyB_11661_blanks.tsv': - 'BLANK.43.12H\t2021-10-21\t193\tControl' + 'BLANK.43.12H\t2017-05-23\t193\tControl' '\tNegative\tSterile water blank\tSterile water blank\turban' ' biome\tresearch facility\tsterile wat' 'er\tmisc environment\tUSA:CA:San Diego' @@ -486,7 +449,7 @@ def test_generate_sample_information_files(self): ' blank\tmetagenome\t256318\tBLANK.43.1' '2H\tStudyB\tTRUE\tUCSD\tFALSE', f'{self.good_run_id}_StudyC_6123_blanks.tsv': - 'BLANK.41.12G\t2021-10-21\t193\tContro' + 'BLANK.41.12G\t2017-05-23\t193\tContro' 'l\tNegative\tSterile water blank\tSterile water blank\turb' 'an biome\tresearch facility\tsterile ' 'water\tmisc environment\tUSA:CA:San D' @@ -1772,42 +1735,6 @@ def delete_rtacomplete_file(self): # make method idempotent pass - def test_required_file_checks(self): - # begin this test by deleting the RunInfo.txt file and verifying that - # Pipeline object will raise an Error. - self.delete_runinfo_file() - - with self.assertRaisesRegex(PipelineError, "required file 'RunInfo.xml" - "' is not present."): - Pipeline(self.good_config_file, self.good_run_id, - self.good_mapping_file_path, - self.output_file_path, - self.qiita_id, Pipeline.AMPLICON_PTYPE) - - # delete RTAComplete.txt and recreate RunInfo.txt file to verify that - # an Error is raised when only RTAComplete.txt is missing. - self.delete_rtacomplete_file() - self.create_runinfo_file() - - with self.assertRaisesRegex(PipelineError, "required file 'RTAComplete" - ".txt' is not present."): - Pipeline(self.good_config_file, self.good_run_id, - self.good_mapping_file_path, - self.output_file_path, - self.qiita_id, Pipeline.AMPLICON_PTYPE) - - # make RunInfo.xml file unreadable and verify that Pipeline object - # raises the expected Error. - self.create_rtacomplete_file() - self.make_runinfo_file_unreadable() - - with self.assertRaisesRegex(PipelineError, "RunInfo.xml is present, " - "but not readable"): - Pipeline(self.good_config_file, self.good_run_id, - self.good_mapping_file_path, self.output_file_path, - self.qiita_id, Pipeline.AMPLICON_PTYPE) - self.make_runinfo_file_readable() - def test_creation(self): # Pipeline should assert due to config_file with self.assertRaises(PipelineError) as e: @@ -2431,7 +2358,10 @@ def test_instrument_utils(self): 'date': '2023-12-15'}, '150629_K1001_0511_AH5L7GBCXX': {'id': 'K1001', 'type': 'HiSeq 4000', - 'date': '2015-06-29'}} + 'date': '2015-06-29'}, + 'r11111_20250101_111111': {'id': 'r11111', + 'type': 'Revio', + 'date': '2025-01-01'}} run_directories = [] for root, dirs, files in walk(self.path('sample_run_directories')):