Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ logs/
dev-tests/
coverage.json
dev-tests/
warehouse/oso_sqlmesh/models/intermediate/int_my_first_model.sql

# typescript
*.tsbuildinfo
Expand Down
38 changes: 30 additions & 8 deletions warehouse/metrics_tools/local/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm. This specific file won't do anything for any model. Probably best not to touch this one. This is probably something we should deprecate but it shouldn't be necessary to edit this at all.

import re
import typing as t
from datetime import datetime, timedelta
from datetime import datetime

import duckdb
import pyarrow as pa
Expand All @@ -26,6 +26,7 @@
from sqlglot import exp
from sqlmesh.core.dialect import parse_one


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -165,13 +166,19 @@ def bq_try_read_with_options(
max_results_per_query: int,
):
result = None
increment = timedelta(days=1)
# increment = timedelta(days=1)
# Exponential increments for reading from bigquery, in case the initial
# restriction is too small
while result is None:
result = bq_read_with_options(start, end, source_table, dest, project_id)
start = start - increment
increment = increment * 2
# while result is None:
# result = bq_read_with_options(start, end, source_table, dest, project_id)
# start = start - increment
# increment = increment * 2

result = bq_read_with_options(start, end, source_table, dest, project_id)
if result is None:
logger.warning(f"No results for {source_table} in {start} → {end}. Skipping.")
return None


return result.slice(
0,
Expand Down Expand Up @@ -247,7 +254,16 @@ def load_from_bq(
config = self._config

# Load the schema from bigqouery
table_schema = self._bqclient.get_table(source_table).schema
# table_schema = self._bqclient.get_table(source_table).schema
# Load the schema from BigQuery
try:
table_schema = self._bqclient.get_table(source_table).schema
except Exception as e:
# Gracefully skip if table or project doesn't exist
if "Not found" in str(e) or "404" in str(e) or "Project" in str(e):
logger.warning(f"Skipping {source_table.path} — table not found in BigQuery")
return
raise

if self.destination_table_exists(rewritten_destination):
if self.has_schema_changed(rewritten_destination, table_schema):
Expand Down Expand Up @@ -298,7 +314,13 @@ def load_from_bq(
table_as_arrow = rows.to_arrow(
create_bqstorage_client=True
) # noqa: F841

# ✅ GUARD: skip committing if no rows were read
if table_as_arrow is None:
logger.warning(
f"Skipping {source_name} -> {destination.table}: "
"no rows for the selected window/restrictions."
)
return
# Load the table
self.commit_table(
source_name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
MODEL (
name oso.project_pnl_quarterly_from_metrics_v1,
description 'Quarterly P&L per project using metrics layer: income=funding_received, expense=funding_awarded.',
dialect trino,
kind FULL,
partitioned_by year(quarter_start),
tags ('model_stage=mart', 'entity_category=project'),
audits (HAS_AT_LEAST_N_ROWS(threshold := 0))
);


WITH src AS (
SELECT
t.sample_date AS sample_date,
CAST(t.project_id AS VARCHAR) AS project_id,
LOWER(COALESCE(p.project_name, 'unknown')) AS project_name,
LOWER(m.metric_name) AS metric_name,
TRY_CAST(t.amount AS DOUBLE) AS amount_usd
FROM oso.timeseries_metrics_by_project_v0 t
JOIN oso.metrics_v0 m
ON m.metric_id = t.metric_id
LEFT JOIN oso.projects_v1 p
ON p.project_id = t.project_id
WHERE LOWER(m.metric_name) IN ('funding_received', 'funding_awarded')
),

q AS (
SELECT
DATE_TRUNC('quarter', CAST(sample_date AS TIMESTAMP)) AS quarter_start,
YEAR(CAST(sample_date AS TIMESTAMP)) AS year,
CONCAT('Q', CAST(QUARTER(CAST(sample_date AS TIMESTAMP)) AS VARCHAR)) AS quarter_label,
project_id,
project_name,
SUM(CASE WHEN metric_name = 'funding_received' THEN amount_usd ELSE 0 END) AS income_usd,
SUM(CASE WHEN metric_name = 'funding_awarded' THEN amount_usd ELSE 0 END) AS expense_usd
FROM src
GROUP BY 1,2,3,4,5
)

SELECT
quarter_start,
year,
quarter_label,
project_id,
project_name,
income_usd,
expense_usd,
(COALESCE(income_usd,0) - COALESCE(expense_usd,0)) AS net_usd
FROM q
ORDER BY quarter_start, project_name;