Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions runtime/databricks/automl_runtime/forecast/deepar/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def __init__(self, model: PyTorchPredictor, horizon: int, frequency_unit: str, f
:param time_col: the time column name
:param id_cols: the column names of the identity columns for multi-series time series; None for single series
:param feature_cols: the column names of the covariate feature columns; None if no covariates
:param split_col: Optional column name of the split columns
:param preprocess_func: Optional callable function for preprocessing input data
"""

super().__init__()
Expand Down Expand Up @@ -100,10 +102,13 @@ def predict(self,
# multi-series: combine id columns
model_input_preprocessed['ts_id'] = model_input_preprocessed[self._id_cols].astype(str).agg('-'.join,
axis=1)
# Apply the custom preprocessing function, which may use the split column
# (e.g., to filter or transform data differently for train/test)
model_input_preprocessed = apply_preprocess_func(model_input_preprocessed,
self._preprocess_func,
self._split_col)
# Save target column from the original input
# Rejoin the original target column after preprocessing in case it was dropped or transformed.
# This ensures the model still has the correct target values for forecasting.
model_input_preprocessed = model_input_preprocessed.join(model_input[self._target_col])

required_cols = [self._target_col, self._time_col]
Expand Down Expand Up @@ -152,7 +157,10 @@ def predict_samples(self,
# Prepare aggregation dictionary
agg_dict = {self._target_col: "mean"}
if self._feature_cols:
# For feature columns, take the mean as well (could also be first/last depending on use case)
# When grouping time series, aggregate feature columns as well.
# - Numeric features: averaged across duplicates (e.g. multiple sensors reporting same timestamp)
# - Categorical features: take the first value (arbitrary but consistent)
# This ensures a single feature vector per time step before passing to GluonTS.
for feature_col in self._feature_cols:
if pd.api.types.is_numeric_dtype(model_input[feature_col]):
agg_dict[feature_col] = "mean"
Expand All @@ -161,19 +169,25 @@ def predict_samples(self,

model_input = model_input.groupby(group_cols).agg(agg_dict).reset_index()

# Ensure the time index is continuous and all covariates are aligned with target steps.
# This also fills missing timestamps, which DeepAR requires for consistent sequence input.
model_input_transformed = set_index_and_fill_missing_time_steps(model_input,
self._time_col,
self._frequency_unit,
self._frequency_quantity,
self._id_cols,
self._feature_cols)
if self._feature_cols:
# Your input can be a dict (multi-series) or a single DataFrame (single-series)
# GluonTS's PandasDataset does not support dynamic features.
# Switch to ListDataset when feature columns are provided.
list_dataset = []

if isinstance(model_input_transformed, dict):
# Multi-series: iterate over each series
for ts_id, df in model_input_transformed.items():
# GluonTS expects dynamic real-valued features with shape (num_features, time_length).
# Transpose from DataFrame shape (time_length, num_features) -> (num_features, time_length).
# These features must align exactly with each timestamp in the target.
target_array = df[self._target_col].dropna().to_numpy() # keep NaNs for horizon
feat_array = df[self._feature_cols].to_numpy().T # transpose for GluonTS
list_dataset.append({
Expand Down
8 changes: 6 additions & 2 deletions runtime/databricks/automl_runtime/forecast/deepar/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ def set_index_and_fill_missing_time_steps(
temp_df = temp_df.reindex(valid_index) # add missing timestamps

if feature_cols:
# Only fill covariates
# Only forward/backward fill covariates.
# Do not fill the target column: DeepAR handles missing targets differently.
# This ensures covariate arrays have no NaNs and align with the full timestamp index.
covars = [c for c in feature_cols if c in temp_df.columns]
temp_df[covars] = temp_df[covars].ffill().bfill()

Expand All @@ -133,7 +135,9 @@ def set_index_and_fill_missing_time_steps(
df = df.reindex(valid_index)

if feature_cols:
# Only fill covariates
# Only forward/backward fill covariates.
# Do not fill the target column: DeepAR handles missing targets differently.
# This ensures covariate arrays have no NaNs and align with the full timestamp index.
covars = [c for c in feature_cols if c in df.columns]
df[covars] = df[covars].ffill().bfill()

Expand Down
109 changes: 108 additions & 1 deletion runtime/tests/automl_runtime/forecast/deepar/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,111 @@ def test_multi_timeseries_month_start_index(self):
expected_df = expected_df.set_index(time_col).rename_axis(None).to_period("M")
# Assert equality
self.assertEqual(transformed_df_dict.keys(), {'1', '2'})
pd.testing.assert_frame_equal(transformed_df_dict["1"], expected_df)
pd.testing.assert_frame_equal(transformed_df_dict["1"], expected_df)

def test_uni_timeseries_with_covariates(self):
"""Test that covariates are preserved, forward/backward filled, and aligned for multiple series"""
target_col = "sales"
time_col = "date"
feature_col = "promo"
num_training_months = 6
num_future_months = 4
num_months = num_training_months + num_future_months

# Base dates
base_dates = pd.date_range(start="2020-01-01", periods=num_months, freq="MS")

# Create a base dataframe for one store with covariate
df = pd.DataFrame({
time_col: base_dates,
target_col: list(range(10, 10 + num_training_months)) + [None] * num_future_months,
feature_col: [None, 2, 3, 4, None, 6, 2, 3, 4, 5] # intentionally include NaNs
})

# Transform the dataframe with covariates
transformed = set_index_and_fill_missing_time_steps(
df,
time_col=time_col,
frequency_unit="MS",
frequency_quantity=1,
feature_cols=[feature_col]
)

# Assert index is PeriodIndex monthly
self.assertTrue(isinstance(transformed.index, pd.PeriodIndex))
self.assertEqual(transformed.index.freqstr, "M")

# Assert target column has NaNs filled in correct positions
self.assertTrue(pd.isna(transformed.loc[transformed.index[6], target_col]))
self.assertTrue(pd.isna(transformed.loc[transformed.index[7], target_col]))
self.assertTrue(pd.isna(transformed.loc[transformed.index[8], target_col]))
self.assertTrue(pd.isna(transformed.loc[transformed.index[9], target_col]))

# Assert covariates are forward/backward filled
# For original NaNs in promo, they should be filled
self.assertFalse(pd.isna(transformed[feature_col]).any())

# Assert that the feature cols have the expected values
expected_feature = pd.Series([2.0, 2.0, 3.0, 4.0, 4.0, 6.0, 2.0, 3.0, 4.0, 5.0], name=feature_col,
index=transformed.index)
self.assertTrue(transformed[feature_col].equals(expected_feature))

def test_multi_timeseries_with_covariates(self):
"""Test that covariates are preserved, forward/backward filled, and aligned for multiple time series"""
target_col = "sales"
time_col = "date"
feature_col = "promo"
num_training_months = 6
num_future_months = 4
num_months = num_training_months + num_future_months
id_col = "store"

# Base dates
base_dates = pd.date_range(start="2020-01-01", periods=num_months, freq="MS")

# Create a base dataframe for one store with covariate
base_df = pd.DataFrame({
time_col: base_dates,
target_col: list(range(10, 10 + num_training_months)) + [None] * num_future_months,
feature_col: [None, 2, None, 4, None, 6, None, None, None, None] # intentionally include NaNs
})

# Duplicate for second store with shifted covariate
df = pd.concat([base_df.copy(), base_df.copy()], ignore_index=True)
df[id_col] = [1] * num_months + [2] * num_months

# Drop a couple of months to test missing time step filling
df = df.drop([2, 12]).reset_index(drop=True)

# Transform the dataframe with covariates
transformed = set_index_and_fill_missing_time_steps(
df,
time_col=time_col,
frequency_unit="MS",
frequency_quantity=1,
id_cols=[id_col],
feature_cols=[feature_col]
)

# Assert keys exist for both series
self.assertEqual(set(transformed.keys()), {"1", "2"})

for ts_id in ["1", "2"]:
ts_df = transformed[ts_id]

# Assert index is PeriodIndex monthly
self.assertTrue(isinstance(ts_df.index, pd.PeriodIndex))
self.assertEqual(ts_df.index.freqstr, "M")

# Assert target column has NaNs filled in correct positions
self.assertTrue(pd.isna(ts_df.loc[ts_df.index[2], target_col]))
self.assertTrue(pd.isna(ts_df.loc[ts_df.index[7], target_col]))

# Assert covariates are forward/backward filled
# For original NaNs in promo, they should be filled
self.assertFalse(pd.isna(ts_df[feature_col]).any())

# Assert that the feature cols have the expected values
expected_feature = pd.Series([2.0, 2.0, 2.0, 4.0, 4.0, 6.0, 6.0, 6.0, 6.0, 6.0], name=feature_col,
index=ts_df.index)
self.assertTrue(ts_df[feature_col].equals(expected_feature))