From 4cdd369145573b68414a6cd1e9e9e5e8bdc6413a Mon Sep 17 00:00:00 2001 From: pavle-martinovic_data Date: Mon, 20 Oct 2025 17:58:41 +0200 Subject: [PATCH 1/2] [ML-52572] Refactor and add tests for covariate inference support in automl runtime for deepar --- .../automl_runtime/forecast/deepar/model.py | 20 +++- .../automl_runtime/forecast/deepar/utils.py | 8 +- .../forecast/deepar/utils_test.py | 109 +++++++++++++++++- 3 files changed, 131 insertions(+), 6 deletions(-) diff --git a/runtime/databricks/automl_runtime/forecast/deepar/model.py b/runtime/databricks/automl_runtime/forecast/deepar/model.py index 962759a..33c3d37 100644 --- a/runtime/databricks/automl_runtime/forecast/deepar/model.py +++ b/runtime/databricks/automl_runtime/forecast/deepar/model.py @@ -64,6 +64,8 @@ def __init__(self, model: PyTorchPredictor, horizon: int, frequency_unit: str, f :param time_col: the time column name :param id_cols: the column names of the identity columns for multi-series time series; None for single series :param feature_cols: the column names of the covariate feature columns; None if no covariates + :param split_col: Optional column name of the split columns + :param preprocess_func: Optional callable function for preprocessing input data """ super().__init__() @@ -100,10 +102,13 @@ def predict(self, # multi-series: combine id columns model_input_preprocessed['ts_id'] = model_input_preprocessed[self._id_cols].astype(str).agg('-'.join, axis=1) + # Apply the custom preprocessing function, which may use the split column + # (e.g., to filter or transform data differently for train/test) model_input_preprocessed = apply_preprocess_func(model_input_preprocessed, self._preprocess_func, self._split_col) - # Save target column from the original input + # Rejoin the original target column after preprocessing in case it was dropped or transformed. + # This ensures the model still has the correct target values for forecasting. model_input_preprocessed = model_input_preprocessed.join(model_input[self._target_col]) required_cols = [self._target_col, self._time_col] @@ -152,7 +157,10 @@ def predict_samples(self, # Prepare aggregation dictionary agg_dict = {self._target_col: "mean"} if self._feature_cols: - # For feature columns, take the mean as well (could also be first/last depending on use case) + # When grouping time series, aggregate feature columns as well. + # - Numeric features: averaged across duplicates (e.g. multiple sensors reporting same timestamp) + # - Categorical features: take the first value (arbitrary but consistent) + # This ensures a single feature vector per time step before passing to GluonTS. for feature_col in self._feature_cols: if pd.api.types.is_numeric_dtype(model_input[feature_col]): agg_dict[feature_col] = "mean" @@ -161,6 +169,8 @@ def predict_samples(self, model_input = model_input.groupby(group_cols).agg(agg_dict).reset_index() + # Ensure the time index is continuous and all covariates are aligned with target steps. + # This also fills missing timestamps, which DeepAR requires for consistent sequence input. model_input_transformed = set_index_and_fill_missing_time_steps(model_input, self._time_col, self._frequency_unit, @@ -168,12 +178,16 @@ def predict_samples(self, self._id_cols, self._feature_cols) if self._feature_cols: - # Your input can be a dict (multi-series) or a single DataFrame (single-series) + # GluonTS's PandasDataset does not support dynamic features. + # Switch to ListDataset when feature columns are provided. list_dataset = [] if isinstance(model_input_transformed, dict): # Multi-series: iterate over each series for ts_id, df in model_input_transformed.items(): + # GluonTS expects dynamic real-valued features with shape (num_features, time_length). + # Transpose from DataFrame shape (time_length, num_features) -> (num_features, time_length). + # These features must align exactly with each timestamp in the target. target_array = df[self._target_col].dropna().to_numpy() # keep NaNs for horizon feat_array = df[self._feature_cols].to_numpy().T # transpose for GluonTS list_dataset.append({ diff --git a/runtime/databricks/automl_runtime/forecast/deepar/utils.py b/runtime/databricks/automl_runtime/forecast/deepar/utils.py index 79bb528..2c5f43e 100644 --- a/runtime/databricks/automl_runtime/forecast/deepar/utils.py +++ b/runtime/databricks/automl_runtime/forecast/deepar/utils.py @@ -116,7 +116,9 @@ def set_index_and_fill_missing_time_steps( temp_df = temp_df.reindex(valid_index) # add missing timestamps if feature_cols: - # Only fill covariates + # Only forward/backward fill covariates. + # Do not fill the target column: DeepAR handles missing targets differently. + # This ensures covariate arrays have no NaNs and align with the full timestamp index. covars = [c for c in feature_cols if c in temp_df.columns] temp_df[covars] = temp_df[covars].ffill().bfill() @@ -133,7 +135,9 @@ def set_index_and_fill_missing_time_steps( df = df.reindex(valid_index) if feature_cols: - # Only fill covariates + # Only forward/backward fill covariates. + # Do not fill the target column: DeepAR handles missing targets differently. + # This ensures covariate arrays have no NaNs and align with the full timestamp index. covars = [c for c in feature_cols if c in df.columns] df[covars] = df[covars].ffill().bfill() diff --git a/runtime/tests/automl_runtime/forecast/deepar/utils_test.py b/runtime/tests/automl_runtime/forecast/deepar/utils_test.py index 9b16b26..86d2826 100644 --- a/runtime/tests/automl_runtime/forecast/deepar/utils_test.py +++ b/runtime/tests/automl_runtime/forecast/deepar/utils_test.py @@ -330,4 +330,111 @@ def test_multi_timeseries_month_start_index(self): expected_df = expected_df.set_index(time_col).rename_axis(None).to_period("M") # Assert equality self.assertEqual(transformed_df_dict.keys(), {'1', '2'}) - pd.testing.assert_frame_equal(transformed_df_dict["1"], expected_df) \ No newline at end of file + pd.testing.assert_frame_equal(transformed_df_dict["1"], expected_df) + + def test_uni_timeseries_with_covariates(self): + """Test that covariates are preserved, forward/backward filled, and aligned for multiple series""" + target_col = "sales" + time_col = "date" + feature_col = "promo" + num_training_months = 6 + num_future_months = 4 + num_months = num_training_months + num_future_months + + # Base dates + base_dates = pd.date_range(start="2020-01-01", periods=num_months, freq="MS") + + # Create a base dataframe for one store with covariate + df = pd.DataFrame({ + time_col: base_dates, + target_col: list(range(10, 10 + num_training_months)) + [None] * num_future_months, + feature_col: [None, 2, 3, 4, None, 6, 2, 3, 4, 5] # intentionally include NaNs + }) + + # Transform the dataframe with covariates + transformed = set_index_and_fill_missing_time_steps( + df, + time_col=time_col, + frequency_unit="MS", + frequency_quantity=1, + feature_cols=[feature_col] + ) + + # Assert index is PeriodIndex monthly + self.assertTrue(isinstance(transformed.index, pd.PeriodIndex)) + self.assertEqual(transformed.index.freqstr, "M") + + # Assert target column has NaNs filled in correct positions + self.assertTrue(pd.isna(transformed.loc[transformed.index[6], target_col])) + self.assertTrue(pd.isna(transformed.loc[transformed.index[7], target_col])) + self.assertTrue(pd.isna(transformed.loc[transformed.index[8], target_col])) + self.assertTrue(pd.isna(transformed.loc[transformed.index[9], target_col])) + + # Assert covariates are forward/backward filled + # For original NaNs in promo, they should be filled + self.assertFalse(pd.isna(transformed[feature_col]).any()) + + # Assert that the feature cols have the expected values + expected_feature = pd.Series([2.0, 2.0, 3.0, 4.0, 4.0, 6.0, 2.0, 3.0, 4.0, 5.0], name=feature_col, + index=transformed.index) + self.assertTrue(transformed[feature_col].equals(expected_feature)) + + def test_multi_timeseries_with_covariates(self): + """Test that covariates are preserved, forward/backward filled, and aligned for multiple series""" + target_col = "sales" + time_col = "date" + feature_col = "promo" + num_training_months = 6 + num_future_months = 4 + num_months = num_training_months + num_future_months + id_col = "store" + + # Base dates + base_dates = pd.date_range(start="2020-01-01", periods=num_months, freq="MS") + + # Create a base dataframe for one store with covariate + base_df = pd.DataFrame({ + time_col: base_dates, + target_col: list(range(10, 10 + num_training_months)) + [None] * num_future_months, + feature_col: [None, 2, None, 4, None, 6, None, None, None, None] # intentionally include NaNs + }) + + # Duplicate for second store with shifted covariate + df = pd.concat([base_df.copy(), base_df.copy()], ignore_index=True) + df[id_col] = [1] * num_months + [2] * num_months + + # Drop a couple of months to test missing time step filling + df = df.drop([2, 12]).reset_index(drop=True) + + # Transform the dataframe with covariates + transformed = set_index_and_fill_missing_time_steps( + df, + time_col=time_col, + frequency_unit="MS", + frequency_quantity=1, + id_cols=[id_col], + feature_cols=[feature_col] + ) + + # Assert keys exist for both series + self.assertEqual(set(transformed.keys()), {"1", "2"}) + + for ts_id in ["1", "2"]: + ts_df = transformed[ts_id] + + # Assert index is PeriodIndex monthly + self.assertTrue(isinstance(ts_df.index, pd.PeriodIndex)) + self.assertEqual(ts_df.index.freqstr, "M") + + # Assert target column has NaNs filled in correct positions + self.assertTrue(pd.isna(ts_df.loc[ts_df.index[2], target_col])) + self.assertTrue(pd.isna(ts_df.loc[ts_df.index[7], target_col])) + + # Assert covariates are forward/backward filled + # For original NaNs in promo, they should be filled + self.assertFalse(pd.isna(ts_df[feature_col]).any()) + + # Assert that the feature cols have the expected values + expected_feature = pd.Series([2.0, 2.0, 2.0, 4.0, 4.0, 6.0, 6.0, 6.0, 6.0, 6.0], name=feature_col, + index=ts_df.index) + self.assertTrue(ts_df[feature_col].equals(expected_feature)) From 9651030bc645bf4200a6e2dba45bbcced4cad850 Mon Sep 17 00:00:00 2001 From: Pavle Martinovic <34302662+Pajaraja@users.noreply.github.com> Date: Thu, 23 Oct 2025 17:35:53 +0200 Subject: [PATCH 2/2] Update runtime/tests/automl_runtime/forecast/deepar/utils_test.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- runtime/tests/automl_runtime/forecast/deepar/utils_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/tests/automl_runtime/forecast/deepar/utils_test.py b/runtime/tests/automl_runtime/forecast/deepar/utils_test.py index 86d2826..9224349 100644 --- a/runtime/tests/automl_runtime/forecast/deepar/utils_test.py +++ b/runtime/tests/automl_runtime/forecast/deepar/utils_test.py @@ -380,7 +380,7 @@ def test_uni_timeseries_with_covariates(self): self.assertTrue(transformed[feature_col].equals(expected_feature)) def test_multi_timeseries_with_covariates(self): - """Test that covariates are preserved, forward/backward filled, and aligned for multiple series""" + """Test that covariates are preserved, forward/backward filled, and aligned for multiple time series""" target_col = "sales" time_col = "date" feature_col = "promo"