|
| 1 | +import itertools |
1 | 2 | import logging
|
2 | 3 | from typing import Iterable, Optional
|
3 | 4 |
|
4 |
| -import biobear |
| 5 | +import dnaio |
5 | 6 | import duckdb
|
| 7 | +import pyarrow |
6 | 8 |
|
7 | 9 | from countess import VERSION
|
8 | 10 | from countess.core.parameters import BaseParam, BooleanParam, FloatParam, StringParam
|
@@ -32,10 +34,20 @@ def load_file(
|
32 | 34 | ) -> duckdb.DuckDBPyRelation:
|
33 | 35 | # Open the file, convert it to a RecordBatchReader and then
|
34 | 36 | # wrap that up as a DuckDBPyRelation so we can filter it.
|
35 |
| - reader = biobear.connect().read_fastq_file(filename) |
36 |
| - rel = cursor.from_arrow(reader.to_arrow_record_batch_reader()) |
| 37 | + fastq_iter = dnaio.open(filename, open_threads=1) |
| 38 | + record_batch_iter = ( |
| 39 | + pyarrow.RecordBatch.from_pylist([{'sequence': z.sequence, 'quality_scores': z.qualities} for z in y]) |
| 40 | + for y in itertools.batched(fastq_iter, 5000) |
| 41 | + ) |
| 42 | + rel = cursor.from_arrow( |
| 43 | + pyarrow.RecordBatchReader.from_batches( |
| 44 | + pyarrow.schema({'sequence': 'str', 'quality_scores': 'str'}), |
| 45 | + record_batch_iter |
| 46 | + ) |
| 47 | + ) |
37 | 48 | if row_limit is not None:
|
38 |
| - rel = rel.limit(row_limit) |
| 49 | + pass |
| 50 | + #rel = rel.limit(row_limit) |
39 | 51 |
|
40 | 52 | if self.min_avg_quality > 0:
|
41 | 53 | rel = rel.filter(
|
@@ -77,8 +89,17 @@ class LoadFastaPlugin(DuckdbLoadFileWithTheLotPlugin):
|
77 | 89 | def load_file(
|
78 | 90 | self, cursor: duckdb.DuckDBPyConnection, filename: str, file_param: BaseParam, row_limit: Optional[int] = None
|
79 | 91 | ) -> duckdb.DuckDBPyRelation:
|
80 |
| - reader = biobear.connect().read_fasta_file(filename) |
81 |
| - rel = cursor.from_arrow(reader.to_arrow_record_batch_reader()) |
| 92 | + fasta_iter = dnaio.open(filename, open_threads=1) |
| 93 | + record_batch_iter = ( |
| 94 | + pyarrow.RecordBatch.from_pylist([{'seq': z.sequence, 'qual': z.qualities} for z in y]) |
| 95 | + for y in itertools.batched(fasta_iter, 5000) |
| 96 | + ) |
| 97 | + rel = cursor.from_arrow( |
| 98 | + pyarrow.RecordBatchReader.from_batches( |
| 99 | + pyarrow.schema({'seq': 'str', 'qual': 'str'}), |
| 100 | + record_batch_iter |
| 101 | + ) |
| 102 | + ) |
82 | 103 | if row_limit is not None:
|
83 | 104 | rel = rel.limit(row_limit)
|
84 | 105 | return rel
|
0 commit comments