Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ benchmark/bin/env.sh
benchmark/src/results
benchmark/src/results/overlap
mprofile*dat
*csv
*csv
!docs/notebooks/example.csv
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

514 changes: 514 additions & 0 deletions docs/notebooks/base_sequence_quality.ipynb

Large diffs are not rendered by default.

201 changes: 201 additions & 0 deletions docs/notebooks/example.csv

Large diffs are not rendered by default.

800 changes: 800 additions & 0 deletions docs/notebooks/example.fastq

Large diffs are not rendered by default.

Binary file added docs/notebooks/example.parquet
Binary file not shown.
250 changes: 250 additions & 0 deletions docs/notebooks/report.html

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions polars_bio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from polars_bio.polars_bio import InputFormat, ReadOptions, VcfReadOptions

from .base_sequnce_quality_vis import visualize_base_sequence_quality
from .context import ctx, set_option
from .io import (
describe_vcf,
Expand All @@ -16,6 +17,7 @@
from .polars_ext import PolarsRangesOperations as LazyFrame
from .range_op import FilterOp, count_overlaps, coverage, merge, nearest, overlap
from .range_viz import visualize_intervals
from .quality_stats import base_sequence_quality

POLARS_BIO_MAX_THREADS = "datafusion.execution.target_partitions"

Expand All @@ -29,6 +31,7 @@
"coverage",
"ctx",
"FilterOp",
"visualize_base_sequence_quality",
"visualize_intervals",
"read_bam",
"read_vcf",
Expand All @@ -45,4 +48,5 @@
"ReadOptions",
"VcfReadOptions",
"set_option",
"base_sequence_quality",
]
51 changes: 51 additions & 0 deletions polars_bio/base_sequnce_quality_vis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from typing import Union

import pandas as pd
import polars as pl
from matplotlib import pyplot as plt


def visualize_base_sequence_quality(df: Union[pd.DataFrame, pl.DataFrame]) -> None:
"""
Visualize the overlapping intervals.

Parameters:
df: Pandas DataFrame or Polars DataFrame. The DataFrame containing the base sequence quality results
"""
assert isinstance(
df, (pd.DataFrame, pl.DataFrame)
), "df must be a Pandas or Polars DataFrame"
df = df if isinstance(df, pd.DataFrame) else df.to_pandas()
df = df.sort_values(by="pos")

boxes = [
{
"label": int(row["pos"]),
"whislo": row["lower"],
"q1": row["q1"],
"med": row["median"],
"q3": row["q3"],
"whishi": row["upper"],
}
for _, row in df.iterrows()
]

fig, ax = plt.subplots()
fig.set_size_inches(15, 5)

plot = ax.plot(df["pos"] + 1, df["avg"])
box_plot = ax.bxp(boxes, showfliers=False)

ax.set_title("base sequence quality")
ax.set_ylabel("Phred score")
ax.set_xlabel("Position in read (bp)")

ax.legend(
[plot[0], box_plot["medians"][0]],
["Average of phred score", "Median of phred score"],
)

for label in ax.get_xticklabels():
label.set_fontsize(6)

plt.show()
59 changes: 59 additions & 0 deletions polars_bio/quality_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from pathlib import Path
from typing import Union
import datafusion
import polars as pl
import pandas as pd
import pyarrow as pa
from .context import ctx
from polars_bio.polars_bio import (
base_sequance_quality_scan,
base_sequance_quality_frame,
)


def base_sequence_quality(
df: Union[str, Path, pl.DataFrame, pl.LazyFrame, pd.DataFrame],
quality_scores_column: str = "quality_scores",
output_type: str = "polars.DataFrame",
) -> Union[pl.DataFrame, pd.DataFrame]:
"""
Compute base sequence quality statistics from various dataframe/file types.

Args:
df: Input data as a file path or dataframe.
quality_scores_column: Name of the column with quality scores.
output_type: Output type, either "polars.DataFrame" or "pandas.DataFrame".

Returns:
DataFrame with base sequence quality statistics.
"""
if isinstance(df, (str, Path)):
df = str(df)
supported_exts = {".parquet", ".csv", ".bed", ".vcf", ".fastq"}
ext = set(Path(df).suffixes)
if not (supported_exts & ext or not ext):
raise ValueError(
"Input file must be a Parquet, CSV, BED, VCF, or FASTQ file."
)
result: datafusion.DataFrame = base_sequance_quality_scan(
ctx, df, quality_scores_column
)
else:
if isinstance(df, pl.LazyFrame):
arrow_table = df.collect().to_arrow()
elif isinstance(df, pl.DataFrame):
arrow_table = df.to_arrow()
elif isinstance(df, pd.DataFrame):
arrow_table = pa.Table.from_pandas(df)
else:
raise TypeError("Unsupported dataframe type.")
result: datafusion.DataFrame = base_sequance_quality_frame(
ctx, arrow_table, quality_scores_column
)

if output_type == "polars.DataFrame":
return result.to_polars()
elif output_type == "pandas.DataFrame":
return result.to_pandas()
else:
raise ValueError("output_type must be 'polars.DataFrame' or 'pandas.DataFrame'")
1 change: 0 additions & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ impl PyBioSessionContext {
pub fn new(seed: String, catalog_dir: String) -> PyResult<Self> {
let ctx = create_context().unwrap();
let session_config: HashMap<String, String> = HashMap::new();

Ok(PyBioSessionContext {
ctx,
session_config,
Expand Down
49 changes: 48 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod sequence_quality_histogram;
mod context;
mod operation;
mod option;
mod quantile_stats;
mod query;
mod scan;
mod streaming;
Expand All @@ -16,6 +18,7 @@ use datafusion::datasource::MemTable;
use datafusion_python::dataframe::PyDataFrame;
use datafusion_vcf::storage::VcfReader;
use log::{debug, error, info};
use operation::do_base_sequence_quality;
use polars_lazy::prelude::{LazyFrame, ScanArgsAnonymous};
use polars_python::error::PyPolarsErr;
use polars_python::lazyframe::PyLazyFrame;
Expand All @@ -33,6 +36,7 @@ use crate::utils::convert_arrow_rb_schema_to_polars_df_schema;

const LEFT_TABLE: &str = "s1";
const RIGHT_TABLE: &str = "s2";
const DEFAULT_TABLE_NAME: &str = "unnamed_table";
const DEFAULT_COLUMN_NAMES: [&str; 3] = ["contig", "start", "end"];

#[pyfunction]
Expand Down Expand Up @@ -403,6 +407,48 @@ fn py_from_polars(
})
}

#[pyfunction]
#[pyo3(signature = (py_ctx, path, column))]
fn base_sequance_quality_scan(
py: Python<'_>,
py_ctx: &PyBioSessionContext,
path: String,
column: String,
) -> PyResult<PyDataFrame> {
py.allow_threads(|| {
let ctx = &py_ctx.ctx;
let rt = Runtime::new().unwrap();
maybe_register_table(path, &DEFAULT_TABLE_NAME.to_string(), None, ctx, &rt);
let data_frame = rt.block_on(do_base_sequence_quality(
ctx,
DEFAULT_TABLE_NAME.to_string(),
column.to_string(),
));
Ok(PyDataFrame::new(data_frame))
})
}

#[pyfunction]
#[pyo3(signature = (py_ctx, df, column))]
fn base_sequance_quality_frame(
py: Python<'_>,
py_ctx: &PyBioSessionContext,
df: PyArrowType<ArrowArrayStreamReader>,
column: String,
) -> PyResult<PyDataFrame> {
py.allow_threads(|| {
let ctx = &py_ctx.ctx;
let rt = Runtime::new().unwrap();
register_frame(py_ctx, df, DEFAULT_TABLE_NAME.to_string());
let data_frame = rt.block_on(do_base_sequence_quality(
ctx,
DEFAULT_TABLE_NAME.to_string(),
column.to_string(),
));
Ok(PyDataFrame::new(data_frame))
})
}

#[pymodule]
fn polars_bio(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
pyo3_log::init();
Expand All @@ -417,7 +463,8 @@ fn polars_bio(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(py_describe_vcf, m)?)?;
m.add_function(wrap_pyfunction!(py_register_view, m)?)?;
m.add_function(wrap_pyfunction!(py_from_polars, m)?)?;
// m.add_function(wrap_pyfunction!(unary_operation_scan, m)?)?;
m.add_function(wrap_pyfunction!(base_sequance_quality_frame, m)?)?;
m.add_function(wrap_pyfunction!(base_sequance_quality_scan, m)?)?;
m.add_class::<PyBioSessionContext>()?;
m.add_class::<FilterOp>()?;
m.add_class::<RangeOp>()?;
Expand Down
41 changes: 41 additions & 0 deletions src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use log::{debug, info};
use sequila_core::session_context::{Algorithm, SequilaConfig};
use tokio::runtime::Runtime;

use crate::sequence_quality_histogram::SequenceQualityHistogramProvider;
use crate::context::set_option_internal;
use crate::option::{FilterOp, RangeOp, RangeOptions};
use crate::quantile_stats::QuantileStatsTableProvider;
use crate::query::{count_overlaps_query, nearest_query, overlap_query};
use crate::udtf::CountOverlapsProvider;
use crate::utils::default_cols_to_string;
Expand Down Expand Up @@ -191,6 +193,45 @@ async fn do_count_overlaps_coverage_naive(
ctx.sql(&query).await.unwrap()
}

pub(crate) async fn do_base_sequence_quality(
ctx: &ExonSession,
table: String,
column: String,
) -> datafusion::dataframe::DataFrame {
let session = Arc::new(ctx.session.clone());
let base_provider = Arc::new(SequenceQualityHistogramProvider::new(
session.clone(),
table.clone(),
column,
));

let base_table_name = format!("{}_decoded", table);
session.deregister_table(base_table_name.clone()).unwrap();
session
.register_table(&base_table_name, base_provider)
.unwrap();

let query = format!(
"SELECT pos, score, SUM(count) as count FROM {} GROUP BY pos, score",
base_table_name
);
let base_df = ctx.sql(&query).await.unwrap();
let base_plan = base_df.create_physical_plan().await.unwrap();

let quantile_provider = Arc::new(QuantileStatsTableProvider::new(base_plan));

let quantile_table_name = format!("{}_quantiles", table);
session
.deregister_table(quantile_table_name.clone())
.unwrap();
session
.register_table(&quantile_table_name, quantile_provider)
.unwrap();

let query = format!("SELECT * FROM {}", quantile_table_name);
ctx.sql(&query).await.unwrap()
}

async fn get_non_join_columns(
table_name: String,
join_columns: Vec<String>,
Expand Down
Loading