@@ -34,33 +34,43 @@ def load_file(
34
34
) -> duckdb .DuckDBPyRelation :
35
35
# Open the file, convert it to a RecordBatchReader and then
36
36
# wrap that up as a DuckDBPyRelation so we can filter it.
37
- fastq_iter = dnaio .open (filename , open_threads = 1 )
37
+ logger .debug ("Loading file %s row_limit %s" , filename , row_limit )
38
+
39
+ # Take up to row_limit records from this file
40
+ fastq_iter = itertools .islice (dnaio .open (filename , open_threads = 1 ), row_limit )
41
+
42
+ def _record_to_dict (record ):
43
+ d = {"sequence" : record .sequence }
44
+ if self .header_column :
45
+ d ["header" ] = record .name
46
+ return d
47
+
48
+ def _avg_quality (record ):
49
+ return sum (ord (c ) for c in record .qualities ) / len (record .qualities ) - 33
50
+
51
+ pyarrow_schema = pyarrow .schema ([pyarrow .field ("sequence" , pyarrow .string ())])
52
+ if self .header_column :
53
+ pyarrow_schema .append (pyarrow .field ("header" , pyarrow .string ()))
54
+
55
+ # Generator which batches records 5000 at a time into RecordBatches
38
56
record_batch_iter = (
39
- pyarrow .RecordBatch .from_pylist ([{'sequence' : z .sequence , 'quality_scores' : z .qualities } for z in y ])
40
- for y in itertools .batched (fastq_iter , 5000 )
41
- )
42
- rel = cursor .from_arrow (
43
- pyarrow .RecordBatchReader .from_batches (
44
- pyarrow .schema ({'sequence' : 'str' , 'quality_scores' : 'str' }),
45
- record_batch_iter
57
+ pyarrow .RecordBatch .from_pylist (
58
+ [
59
+ _record_to_dict (record )
60
+ for record in batch
61
+ if self .min_avg_quality <= 0 or self .min_avg_quality <= _avg_quality (record )
62
+ ]
46
63
)
64
+ for batch in itertools .batched (fastq_iter , 5000 )
47
65
)
48
- if row_limit is not None :
49
- pass
50
- #rel = rel.limit(row_limit)
51
-
52
- if self .min_avg_quality > 0 :
53
- rel = rel .filter (
54
- "list_aggregate(list_transform(string_split(quality_scores, ''), x -> ord(x)), 'avg') - 33 >= %f"
55
- % self .min_avg_quality .value
56
- )
66
+
67
+ # We can turn that generator of RecordBatches into a temporary table
68
+ rel = cursor .from_arrow (pyarrow .RecordBatchReader .from_batches (pyarrow_schema , record_batch_iter ))
57
69
58
70
if self .group :
59
71
rel = rel .aggregate ("sequence, count(*) as count" )
60
- elif self .header_column :
61
- rel = rel .project ("sequence, name || ' ' || description as header" )
62
- else :
63
- rel = rel .project ("sequence" )
72
+
73
+ logger .debug ("Loading file %s row_limit %s done" , filename , row_limit )
64
74
return rel
65
75
66
76
def combine (
@@ -83,23 +93,17 @@ class LoadFastaPlugin(DuckdbLoadFileWithTheLotPlugin):
83
93
84
94
file_types = [("FASTA" , [".fasta" , ".fa" , ".fasta.gz" , ".fa.gz" , ".fasta.bz2" , ".fa.bz2" ])]
85
95
86
- sequence_column = StringParam ("Sequence Column" , "sequence" )
87
- header_column = StringParam ("Header Column" , "header" )
88
-
89
96
def load_file (
90
97
self , cursor : duckdb .DuckDBPyConnection , filename : str , file_param : BaseParam , row_limit : Optional [int ] = None
91
98
) -> duckdb .DuckDBPyRelation :
92
- fasta_iter = dnaio .open (filename , open_threads = 1 )
99
+ pyarrow_schema = pyarrow .schema (
100
+ [pyarrow .field ("sequence" , pyarrow .string ()), pyarrow .field ("header" , pyarrow .string ())]
101
+ )
102
+
103
+ fasta_iter = itertools .islice (dnaio .open (filename , open_threads = 1 ), row_limit )
93
104
record_batch_iter = (
94
- pyarrow .RecordBatch .from_pylist ([{'seq' : z .sequence , 'qual' : z .qualities } for z in y ])
105
+ pyarrow .RecordBatch .from_pylist ([{"sequence" : z .sequence , "header" : z .name } for z in y ])
95
106
for y in itertools .batched (fasta_iter , 5000 )
96
107
)
97
- rel = cursor .from_arrow (
98
- pyarrow .RecordBatchReader .from_batches (
99
- pyarrow .schema ({'seq' : 'str' , 'qual' : 'str' }),
100
- record_batch_iter
101
- )
102
- )
103
- if row_limit is not None :
104
- rel = rel .limit (row_limit )
108
+ rel = cursor .from_arrow (pyarrow .RecordBatchReader .from_batches (pyarrow_schema , record_batch_iter ))
105
109
return rel
0 commit comments