-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
app.py
import streamlit as st
import pandas as pd
import dask.dataframe as dd
import io
import os
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime
---------------------------
Configuration
---------------------------
OUTPUT_DIR = "processed_data"
os.makedirs(OUTPUT_DIR, exist_ok=True)
st.set_page_config(page_title="Auto EDL + Dashboards", layout="wide")
---------------------------
Utility / ETL helpers
---------------------------
def read_file_to_dask(uploaded_file, sample_rows=10000):
"""
Accepts a file-like uploaded_file (from Streamlit), returns a Dask DataFrame.
Supports csv, xlsx, parquet.
Uses dtype inference from sample for performance.
"""
name = uploaded_file.name.lower()
# Read small sample with pandas to infer dtypes (faster)
uploaded_file.seek(0)
if name.endswith(".csv"):
sample_df = pd.read_csv(uploaded_file, nrows=sample_rows)
dtypes = sample_df.dtypes.to_dict()
uploaded_file.seek(0)
# let Dask read CSV with inferred dtypes
df = dd.read_csv(uploaded_file, assume_missing=True, dtype=dtypes, blocksize="64MB")
return df
elif name.endswith((".xls", ".xlsx")):
# pandas to read excel (Dask has no direct read_excel)
uploaded_file.seek(0)
pdf = pd.read_excel(uploaded_file)
return dd.from_pandas(pdf, npartitions=8)
elif name.endswith(".parquet"):
# Save uploaded bytes then read with dask
tmp_path = os.path.join(OUTPUT_DIR, f"tmp_{int(datetime.now().timestamp())}.parquet")
with open(tmp_path, "wb") as f:
f.write(uploaded_file.read())
df = dd.read_parquet(tmp_path)
return df
else:
raise ValueError("Unsupported file type. Upload CSV, Excel or Parquet.")
def basic_cleaning(ddf, config):
"""
Perform a set of automatic cleaning tasks on Dask DataFrame:
- trim column names
- drop duplicate rows (if asked)
- parse date columns heuristically
- fill missing for numeric columns (median) and categorical (mode)
- create a small set of features (year/month if date present)
"""
# normalize column names
ddf.columns = [c.strip().lower().replace(" ", "_") for c in ddf.columns]
# drop exact duplicates if requested
if config.get("drop_duplicates", True):
ddf = ddf.drop_duplicates()
# detect date columns by dtype or name
date_cols = [c for c, dt in ddf.dtypes.items() if "datetime" in str(dt).lower()]
# heuristic by name
for c in ddf.columns:
if c not in date_cols and any(k in c for k in ["date", "txn", "transaction", "posted", "time"]):
try:
ddf[c] = dd.to_datetime(ddf[c], errors="coerce")
date_cols.append(c)
except Exception:
pass
# numeric vs categorical fill
num_cols = [c for c, dt in ddf.dtypes.items() if "int" in str(dt).lower() or "float" in str(dt).lower()]
cat_cols = [c for c in ddf.columns if c not in num_cols and c not in date_cols]
# Fill numeric with median (computed with dask)
for c in num_cols:
try:
med = ddf[c].quantile(0.5).compute()
ddf[c] = ddf[c].fillna(med)
except Exception:
ddf[c] = ddf[c].fillna(0)
# Fill categorical with mode (approx)
for c in cat_cols:
try:
top = ddf[c].value_counts().nlargest(1).compute()
if not top.empty:
mode_val = top.index[0]
ddf[c] = ddf[c].fillna(mode_val)
else:
ddf[c] = ddf[c].fillna("missing")
except Exception:
ddf[c] = ddf[c].fillna("missing")
# Create date features if any date column exists (take first)
if date_cols:
dtc = date_cols[0]
ddf["__year"] = dd.to_datetime(ddf[dtc], errors="coerce").dt.year
ddf["__month"] = dd.to_datetime(ddf[dtc], errors="coerce").dt.month
ddf["__day"] = dd.to_datetime(ddf[dtc], errors="coerce").dt.day
return ddf, {"date_cols": date_cols, "num_cols": num_cols, "cat_cols": cat_cols}
def save_dask_to_parquet(ddf, base_name="cleaned"):
out_path = os.path.join(OUTPUT_DIR, f"{base_name}_{int(datetime.now().timestamp())}.parquet")
ddf.to_parquet(out_path, engine="pyarrow", write_index=False)
return out_path
---------------------------
Streamlit UI
---------------------------
st.title("📂 Auto EDL + Dashboards — Upload & Visualize (Local)")
st.markdown(
"""
Upload a raw credit-card dataset (CSV / Excel / Parquet).
The app will automatically:
- Extract & load using Dask for large files
- Perform data cleaning & basic feature creation (EDL)
- Store cleaned data as Parquet for fast re-use
- Build interactive dashboards (summary, histograms, correlations)
"""
)
File uploader
uploaded_file = st.file_uploader("Upload CSV / Excel / Parquet (supports large files)", type=["csv","xlsx","xls","parquet"])
drop_dup = st.checkbox("Drop duplicate rows (recommended)", value=True)
max_rows_preview = st.slider("Max rows to preview (in-browser)", 1000, 10000, 2000, step=500)
if uploaded_file:
# Stage 1: Read to Dask
with st.spinner("Reading file into Dask (scales to large files)..."):
try:
ddf = read_file_to_dask(uploaded_file)
except Exception as e:
st.error(f"Failed to read file: {e}")
st.stop()
st.success("File loaded into Dask dataframe.")
st.write("Dask partitions:", ddf.npartitions)
# Show schema (dtypes)
st.subheader("Schema (inferred dtypes)")
# compute small sample for display
sample = ddf.head(max_rows_preview)
st.dataframe(sample.head(10))
# Run cleaning
if st.button("Run EDL and Generate Dashboards"):
config = {"drop_duplicates": drop_dup}
with st.spinner("Running cleaning steps... this may take some seconds for 500k rows..."):
cleaned_ddf, meta = basic_cleaning(ddf, config)
# persist cleaned ddf (computes)
cleaned_ddf = cleaned_ddf.persist()
# Save to parquet
out_parquet = save_dask_to_parquet(cleaned_ddf, base_name="credit_cleaned")
st.success(f"Cleaning done — saved cleaned parquet to `{out_parquet}`")
# Load a sample (pandas) for quick plotting in-browser
st.subheader("Quick Summary")
sample_pd = cleaned_ddf.head(max_rows_preview)
st.write("Rows (sample shown):", len(sample_pd))
st.dataframe(sample_pd.head(10))
# Dashboard layout
st.subheader("Automated Dashboards")
# 1) Numeric summary
st.markdown("### Numeric columns summary")
if meta["num_cols"]:
num_summary = sample_pd[meta["num_cols"]].describe().T
st.dataframe(num_summary)
# show histograms for top 4 numeric cols
cols_to_plot = meta["num_cols"][:4]
for c in cols_to_plot:
fig = px.histogram(sample_pd, x=c, nbins=50, title=f"Distribution of {c}")
st.plotly_chart(fig, use_container_width=True)
else:
st.info("No numeric columns detected.")
# 2) Categorical top values
st.markdown("### Categorical columns top values")
if meta["cat_cols"]:
for c in meta["cat_cols"][:6]:
top = (sample_pd[c].value_counts().nlargest(10)).reset_index()
top.columns = [c, "count"]
fig = px.bar(top, x=c, y="count", title=f"Top values for {c}")
st.plotly_chart(fig, use_container_width=True)
else:
st.info("No categorical columns detected.")
# 3) Correlation heatmap (numeric)
if meta["num_cols"] and len(meta["num_cols"]) > 1:
st.markdown("### Correlation (numeric)")
corr = sample_pd[meta["num_cols"]].corr()
fig = px.imshow(corr, text_auto=True, title="Numeric correlation matrix")
st.plotly_chart(fig, use_container_width=True)
# 4) Time series if date detected
if meta["date_cols"]:
st.markdown("### Time-series overview")
dtcol = meta["date_cols"][0]
# aggregate by month if present
try:
sample_pd[dtcol] = pd.to_datetime(sample_pd[dtcol], errors="coerce")
agg = sample_pd.set_index(dtcol).resample('M').size().rename("count").reset_index()
fig = px.line(agg, x=dtcol, y="count", title=f"Transactions per month ({dtcol})")
st.plotly_chart(fig, use_container_width=True)
except Exception:
st.info("Couldn't create time-series plot for the detected date column.")
# 5) Download cleaned data
st.markdown("### Download cleaned data (Parquet)")
st.write(f"Saved: `{out_parquet}`")
with open(out_parquet, "rb") as f:
st.download_button("Download cleaned parquet", data=f, file_name=os.path.basename(out_parquet), mime="application/octet-stream")
st.success("Dashboards generated. Scroll up to view charts.")
st.info("Tip: Use Parquet as your upload format next time for fastest processing.")
else:
st.info("Upload your credit-card dataset to get started (CSV, Excel, Parquet).")