Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
5d65b48
initial changes
antgonza Aug 27, 2025
844e4bd
Merge branch 'main' of https://github.com/qiita-spots/qp-knight-lab-p…
antgonza Aug 30, 2025
55ab210
init push
antgonza Sep 1, 2025
4439472
fix InstrumentUtils
antgonza Sep 2, 2025
6bf3586
rm test_required_file_checks
antgonza Sep 2, 2025
411a119
m11111_20250101_111111_s4
antgonza Sep 2, 2025
e8a2d0b
add conda
antgonza Sep 2, 2025
5e16ab1
pacbio_generate_bam2fastq_commands
antgonza Sep 3, 2025
827a372
self.node_count -> self.nprocs
antgonza Sep 3, 2025
551bfd1
generate_sequence_counts
antgonza Sep 3, 2025
6311723
init changes to def _inject_data(self, wf):
antgonza Sep 3, 2025
40afadf
read_length
antgonza Sep 3, 2025
52f4c53
pmls_extra_parameters
antgonza Sep 3, 2025
e18cf5d
rm index in _inject_data
antgonza Sep 3, 2025
0734a6e
dstats
antgonza Sep 3, 2025
281bf75
nuqc_job_single.sh
antgonza Sep 3, 2025
1aa102c
rm extra ,
antgonza Sep 3, 2025
01c862d
print Profile selected
antgonza Sep 4, 2025
05036b7
counts.txt in _inject_data
antgonza Sep 4, 2025
d5adee0
demux_just_fwd
antgonza Sep 4, 2025
2fcfc28
demux_single -? demux_just_fwd
antgonza Sep 4, 2025
27f49d1
add cli for demux_just_fwd
antgonza Sep 4, 2025
dea4fd9
fix demux_just_fwd params
antgonza Sep 5, 2025
e596ade
rm demux_just_fwd_processing splitter_binary
antgonza Sep 5, 2025
b27e7d6
self.files_regex = long_read
antgonza Sep 5, 2025
1666516
sample_id_column
antgonza Sep 6, 2025
0141440
mv self.read_length = read_length up
antgonza Sep 6, 2025
2b5700f
_filter_empty_fastq_files
antgonza Sep 6, 2025
cc29d39
zip_longest
antgonza Sep 7, 2025
3a94746
raw_reads_r1r2
antgonza Sep 8, 2025
6e12dd5
fastq.gz -> trimmed.fastq.gz
antgonza Sep 9, 2025
b2a308e
barcode
antgonza Sep 9, 2025
1da1abb
barcode -> barcode_id
antgonza Sep 9, 2025
e600bfb
S000
antgonza Sep 10, 2025
d73b6a7
del raw_reverse_seqs
antgonza Sep 10, 2025
c2b0a3e
test filenames
antgonza Sep 10, 2025
b2b4d60
S000_L001_R1_001.counts.txt
antgonza Sep 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/qiita-plugin-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ jobs:
# seqtk is a conda-installed requirement for metapool and isn't
# installed automatically by its setup.py.
conda config --add channels bioconda
conda create -q --yes -n klp python=3.9 'seqtk>=1.4'
conda create -q --yes -n klp python=3.9 'seqtk>=1.4' pbtk

conda activate klp

Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,5 @@ dependencies = [
configure_klp = "qp_klp.scripts.configure_klp:config"
start_klp = "qp_klp.scripts.start_klp:execute"
demux = "sequence_processing_pipeline.scripts.cli:demux"
demux_just_fwd = "sequence_processing_pipeline.Commands:demux_just_fwd"
pacbio_generate_bam2fastq_commands = "qp_klp.scripts.pacbio_commands:generate_bam2fastq_commands"
4 changes: 3 additions & 1 deletion src/qp_klp/Assays.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ def qc_reads(self):
# is not defined, just fallback to the SPP expected default regex
if not hasattr(self, 'files_regex'):
self.files_regex = 'SPP'

# base quality control used by multiple Assay types.
job = NuQCJob(self.raw_fastq_files_path,
self.pipeline.output_path,
Expand All @@ -517,7 +518,8 @@ def qc_reads(self):
bucket_size=config['bucket_size'],
length_limit=config['length_limit'],
cores_per_task=config['cores_per_task'],
files_regex=self.files_regex)
files_regex=self.files_regex,
read_length=self.read_length)

if 'NuQCJob' not in self.skip_steps:
job.run(callback=self.job_callback)
Expand Down
66 changes: 66 additions & 0 deletions src/qp_klp/PacBioMetagenomicWorkflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from .Protocol import PacBio
from sequence_processing_pipeline.Pipeline import Pipeline
from .Assays import Metagenomic
from .Assays import ASSAY_NAME_METAGENOMIC
from .FailedSamplesRecord import FailedSamplesRecord
from .Workflows import Workflow
import pandas as pd


class PacBioMetagenomicWorkflow(Workflow, Metagenomic, PacBio):
def __init__(self, **kwargs):
super().__init__(**kwargs)

self.mandatory_attributes = ['qclient', 'uif_path',
'lane_number', 'config_fp',
'run_identifier', 'output_dir', 'job_id',
'lane_number', 'is_restart']

self.confirm_mandatory_attributes()

# second stage initializer that could conceivably be pushed down into
# specific children requiring specific parameters.
self.qclient = self.kwargs['qclient']

self.overwrite_prep_with_original = False
if 'overwrite_prep_with_original' in self.kwargs:
self.overwrite_prep_with_original = \
self.kwargs['overwrite_prep_with_original']
self.pipeline = Pipeline(self.kwargs['config_fp'],
self.kwargs['run_identifier'],
self.kwargs['uif_path'],
self.kwargs['output_dir'],
self.kwargs['job_id'],
ASSAY_NAME_METAGENOMIC,
lane_number=self.kwargs['lane_number'])

self.fsr = FailedSamplesRecord(self.kwargs['output_dir'],
self.pipeline.sample_sheet.samples)

samples = [
{'barcode': sample['barcode_id'],
'sample_name': sample['Sample_ID'],
'project_name': sample['Sample_Project'],
'lane': sample['Lane']}
for sample in self.pipeline.sample_sheet.samples]
df = pd.DataFrame(samples)
sample_list_fp = f"{self.kwargs['output_dir']}/sample_list.tsv"
df.to_csv(sample_list_fp, sep='\t', index=False)

self.master_qiita_job_id = self.kwargs['job_id']

self.lane_number = self.kwargs['lane_number']
self.is_restart = bool(self.kwargs['is_restart'])

if self.is_restart is True:
self.determine_steps_to_skip()

# this is a convenience member to allow testing w/out updating Qiita.
self.update = True

if 'update_qiita' in kwargs:
if not isinstance(kwargs['update_qiita'], bool):
raise ValueError("value for 'update_qiita' must be of "
"type bool")

self.update = kwargs['update_qiita']
101 changes: 98 additions & 3 deletions src/qp_klp/Protocol.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from sequence_processing_pipeline.ConvertJob import ConvertJob
from sequence_processing_pipeline.ConvertJob import (
ConvertJob, ConvertPacBioBam2FastqJob)
from sequence_processing_pipeline.TellReadJob import TellReadJob
from sequence_processing_pipeline.SeqCountsJob import SeqCountsJob
from sequence_processing_pipeline.TRIntegrateJob import TRIntegrateJob
from sequence_processing_pipeline.PipelineError import PipelineError
from sequence_processing_pipeline.util import determine_orientation
from os.path import join, split, basename, dirname
from re import match
from os import makedirs, rename, walk
from os.path import join, split, basename, dirname, exists
from metapool import load_sample_sheet
from metapool.sample_sheet import PROTOCOL_NAME_ILLUMINA, PROTOCOL_NAME_TELLSEQ
from metapool.sample_sheet import (
PROTOCOL_NAME_ILLUMINA, PROTOCOL_NAME_TELLSEQ,
PROTOCOL_NAME_PACBIO_SMRT)
import pandas as pd
from glob import glob
from qiita_client.util import system_call
Expand Down Expand Up @@ -79,6 +82,26 @@ def subsample_reads(self):

class Illumina(Protocol):
protocol_type = PROTOCOL_NAME_ILLUMINA
# required files for successful operation for Illumina (making the default
# here) both RTAComplete.txt and RunInfo.xml should reside in the root of
# the run directory.
required_files = ['RTAComplete.txt', 'RunInfo.xml']
read_length = 'short'

def __init__(self) -> None:
super().__init__()

for some_file in self.required_files:
if not exists(join(self.run_dir, some_file)):
raise PipelineError(f"required file '{some_file}' is not "
f"present in {self.run_dir}.")

# verify that RunInfo.xml file is readable.
try:
fp = open(join(self.run_dir, 'RunInfo.xml'))
fp.close()
except PermissionError:
raise PipelineError('RunInfo.xml is present, but not readable')

def convert_raw_to_fastq(self):
def get_config(command):
Expand Down Expand Up @@ -156,6 +179,7 @@ def generate_sequence_counts(self):

class TellSeq(Protocol):
protocol_type = PROTOCOL_NAME_TELLSEQ
read_length = 'short'

def convert_raw_to_fastq(self):
config = self.pipeline.get_software_configuration('tell-seq')
Expand Down Expand Up @@ -369,3 +393,74 @@ def _post_process_file(self, fastq_file, mapping, lane):
rename(fastq_file, final_path)

return final_path


class PacBio(Protocol):
protocol_type = PROTOCOL_NAME_PACBIO_SMRT
read_length = 'long'

def convert_raw_to_fastq(self):
config = self.pipeline.get_software_configuration('pacbio_convert')

job = ConvertPacBioBam2FastqJob(
self.pipeline.run_dir,
self.pipeline.output_path,
self.pipeline.input_file_path,
config['queue'],
config['nodes'],
config['nprocs'],
config['wallclock_time_in_minutes'],
config['per_process_memory_limit'],
config['executable_path'],
config['modules_to_load'],
self.master_qiita_job_id)

self.raw_fastq_files_path = join(self.pipeline.output_path,
'ConvertJob')

# if ConvertJob already completed, then skip the over the time-
# consuming portion but populate the needed member variables.
if 'ConvertJob' not in self.skip_steps:
job.run(callback=self.job_callback)

# audit the results to determine which samples failed to convert
# properly. Append these to the failed-samples report and also
# return the list directly to the caller.
failed_samples = job.audit(self.pipeline.get_sample_ids())
if hasattr(self, 'fsr'):
# NB 16S does not require a failed samples report and
# it is not performed by SPP.
self.fsr.write(failed_samples, job.__class__.__name__)

return failed_samples

def generate_sequence_counts(self):
# for other isntances of generate_sequence_counts in other objects
# the sequence counting needs to be done; however, for PacBio we
# already have done it and just need to merge the results.
gz_files = glob(f'{self.raw_fastq_files_path}/*/*.fastq.gz')
data, missing_files = [], []

for gzf in gz_files:
cf = gzf.replace('.fastq.gz', '.counts.txt')
sn = basename(cf).replace('_R1.counts.txt', '')
if not exists(cf):
missing_files.append(sn)
continue
with open(cf, 'r') as fh:
counts = fh.read().strip()
data.append({'Sample_ID': sn,
'raw_reads_r1r2': counts,
'Lane': self.lane_number})

if missing_files:
raise ValueError(f'Missing count files: {missing_files}')

df = pd.DataFrame(data)
self.reports_path = join(self.pipeline.output_path,
'ConvertJob',
'SeqCounts.csv')
df.to_csv(self.reports_path, index=False)

def integrate_results(self):
pass
7 changes: 6 additions & 1 deletion src/qp_klp/WorkflowFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .StandardMetatranscriptomicWorkflow import \
StandardMetatranscriptomicWorkflow
from .TellseqMetagenomicWorkflow import TellSeqMetagenomicWorkflow
from .PacBioMetagenomicWorkflow import PacBioMetagenomicWorkflow
from sequence_processing_pipeline.Pipeline import Pipeline
from metapool import load_sample_sheet
from metapool.sample_sheet import SAMPLE_SHEETS_BY_PROTOCOL as SSBP
Expand All @@ -14,7 +15,8 @@ class WorkflowFactory():
WORKFLOWS = [StandardMetagenomicWorkflow,
StandardMetatranscriptomicWorkflow,
StandardAmpliconWorkflow,
TellSeqMetagenomicWorkflow]
TellSeqMetagenomicWorkflow,
PacBioMetagenomicWorkflow]

@classmethod
def _get_instrument_type(cls, sheet):
Expand Down Expand Up @@ -76,6 +78,9 @@ def generate_workflow(cls, **kwargs):
"sample-sheet or a mapping-file.")

for workflow in WorkflowFactory.WORKFLOWS:
if workflow.read_length not in {'short', 'long'}:
raise ValueError('Invalid read_length: '
f'{workflow.read_length} for {workflow}')
if workflow.assay_type == assay_type:
if workflow.protocol_type == instrument_type:
# return instantiated workflow object
Expand Down
5 changes: 5 additions & 0 deletions src/qp_klp/Workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
from shutil import rmtree
from .Assays import ASSAY_NAME_AMPLICON
from metapool.sample_sheet import PROTOCOL_NAME_PACBIO_SMRT
from sequence_processing_pipeline.util import determine_orientation


Expand Down Expand Up @@ -600,6 +601,10 @@ def _get_postqc_fastq_files(self, out_dir, project):
if self.pipeline.pipeline_type != ASSAY_NAME_AMPLICON:
del (files['raw_barcodes'])

# PacBio doesn't have reverse reads
if self.protocol_type == PROTOCOL_NAME_PACBIO_SMRT:
del (files['raw_reverse_seqs'])

# confirm expected lists of reads are not empty.
for f_type in files:
if not files[f_type]:
Expand Down
53 changes: 53 additions & 0 deletions src/qp_klp/scripts/pacbio_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python

# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
import click
import pandas as pd
from glob import glob
from os import makedirs


@click.command()
@click.argument('sample_list', required=True)
@click.argument('run_folder', required=True)
@click.argument('outdir', required=True)
@click.argument('threads', required=True, default=1)
def generate_bam2fastq_commands(sample_list, run_folder, outdir, threads):
"""Generates the bam2fastq commands"""
df = pd.read_csv(sample_list, sep='\t')
files = {f.split('.')[-2]: f
for f in glob(f'{run_folder}/*/hifi_reads/*.bam')}

makedirs(outdir, exist_ok=True)

commands, missing_files = [], []
for _, row in df.iterrows():
bc = row['barcode']
sn = row['sample_name']
pn = row['project_name']
lane = row['lane']
if bc not in files:
missing_files.append(bc)
continue
od = f'{outdir}/{pn}'

makedirs(od, exist_ok=True)
fn = f'{od}/{sn}_S000_L00{lane}_R1_001'
cmd = (f'bam2fastq -j {threads} -o {fn} -c 9 '
f'{files[bc]}; '
f'fqtools count {fn}.fastq.gz > '
f'{fn}.counts.txt')
commands.append(cmd)

if missing_files:
raise ValueError(
f'{run_folder} is missing barcodes: {missing_files}')

for cmd in commands:
print(cmd)
Loading
Loading