diff --git a/.gitignore b/.gitignore index 5cee0d88c3..0792de5113 100644 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,7 @@ logs/ dev-tests/ coverage.json dev-tests/ +warehouse/oso_sqlmesh/models/intermediate/int_my_first_model.sql # typescript *.tsbuildinfo diff --git a/warehouse/metrics_tools/local/loader.py b/warehouse/metrics_tools/local/loader.py index cd19cff9e7..de90a887b5 100644 --- a/warehouse/metrics_tools/local/loader.py +++ b/warehouse/metrics_tools/local/loader.py @@ -3,7 +3,7 @@ import os import re import typing as t -from datetime import datetime, timedelta +from datetime import datetime import duckdb import pyarrow as pa @@ -26,6 +26,7 @@ from sqlglot import exp from sqlmesh.core.dialect import parse_one + logger = logging.getLogger(__name__) @@ -165,13 +166,19 @@ def bq_try_read_with_options( max_results_per_query: int, ): result = None - increment = timedelta(days=1) + # increment = timedelta(days=1) # Exponential increments for reading from bigquery, in case the initial # restriction is too small - while result is None: - result = bq_read_with_options(start, end, source_table, dest, project_id) - start = start - increment - increment = increment * 2 + # while result is None: + # result = bq_read_with_options(start, end, source_table, dest, project_id) + # start = start - increment + # increment = increment * 2 + + result = bq_read_with_options(start, end, source_table, dest, project_id) + if result is None: + logger.warning(f"No results for {source_table} in {start} → {end}. Skipping.") + return None + return result.slice( 0, @@ -247,7 +254,16 @@ def load_from_bq( config = self._config # Load the schema from bigqouery - table_schema = self._bqclient.get_table(source_table).schema + # table_schema = self._bqclient.get_table(source_table).schema + # Load the schema from BigQuery + try: + table_schema = self._bqclient.get_table(source_table).schema + except Exception as e: + # Gracefully skip if table or project doesn't exist + if "Not found" in str(e) or "404" in str(e) or "Project" in str(e): + logger.warning(f"Skipping {source_table.path} — table not found in BigQuery") + return + raise if self.destination_table_exists(rewritten_destination): if self.has_schema_changed(rewritten_destination, table_schema): @@ -298,7 +314,13 @@ def load_from_bq( table_as_arrow = rows.to_arrow( create_bqstorage_client=True ) # noqa: F841 - + # ✅ GUARD: skip committing if no rows were read + if table_as_arrow is None: + logger.warning( + f"Skipping {source_name} -> {destination.table}: " + "no rows for the selected window/restrictions." + ) + return # Load the table self.commit_table( source_name, diff --git a/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql b/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql new file mode 100644 index 0000000000..e0b955fda6 --- /dev/null +++ b/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql @@ -0,0 +1,50 @@ +MODEL ( + name oso.project_pnl_quarterly_from_metrics_v1, + description 'Quarterly P&L per project using metrics layer: income=funding_received, expense=funding_awarded.', + dialect trino, + kind FULL, + partitioned_by year(quarter_start), + tags ('model_stage=mart', 'entity_category=project'), + audits (HAS_AT_LEAST_N_ROWS(threshold := 0)) +); + + +WITH src AS ( + SELECT + t.sample_date AS sample_date, + CAST(t.project_id AS VARCHAR) AS project_id, + LOWER(COALESCE(p.project_name, 'unknown')) AS project_name, + LOWER(m.metric_name) AS metric_name, + TRY_CAST(t.amount AS DOUBLE) AS amount_usd + FROM oso.timeseries_metrics_by_project_v0 t + JOIN oso.metrics_v0 m + ON m.metric_id = t.metric_id + LEFT JOIN oso.projects_v1 p + ON p.project_id = t.project_id + WHERE LOWER(m.metric_name) IN ('funding_received', 'funding_awarded') +), + +q AS ( + SELECT + DATE_TRUNC('quarter', CAST(sample_date AS TIMESTAMP)) AS quarter_start, + YEAR(CAST(sample_date AS TIMESTAMP)) AS year, + CONCAT('Q', CAST(QUARTER(CAST(sample_date AS TIMESTAMP)) AS VARCHAR)) AS quarter_label, + project_id, + project_name, + SUM(CASE WHEN metric_name = 'funding_received' THEN amount_usd ELSE 0 END) AS income_usd, + SUM(CASE WHEN metric_name = 'funding_awarded' THEN amount_usd ELSE 0 END) AS expense_usd + FROM src + GROUP BY 1,2,3,4,5 +) + +SELECT + quarter_start, + year, + quarter_label, + project_id, + project_name, + income_usd, + expense_usd, + (COALESCE(income_usd,0) - COALESCE(expense_usd,0)) AS net_usd +FROM q +ORDER BY quarter_start, project_name;