From 7b38de4f68bf2e6a88a4d39bf9aeca3bbcf9c872 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Fri, 27 Jan 2023 23:51:35 -0800 Subject: [PATCH 1/2] Add support for optionally skipping features without data --- .../daily/2018/05/01/data.avro.json | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 feathr-impl/src/test/resources/slidingWindowAgg/missingFeatureData/daily/2018/05/01/data.avro.json diff --git a/feathr-impl/src/test/resources/slidingWindowAgg/missingFeatureData/daily/2018/05/01/data.avro.json b/feathr-impl/src/test/resources/slidingWindowAgg/missingFeatureData/daily/2018/05/01/data.avro.json new file mode 100644 index 000000000..70becc116 --- /dev/null +++ b/feathr-impl/src/test/resources/slidingWindowAgg/missingFeatureData/daily/2018/05/01/data.avro.json @@ -0,0 +1,149 @@ +{ + "schema": { + "type": "record", + "name": "NTVInput", + "doc": "Daily or multi-day aggregated a activity features generated from similar data sources.", + "namespace": "com.linkedin.feathr.offline.data", + "fields": [ + { + "name": "x", + "type": "string", + "doc": "Id of the a" + }, + { + "name": "features", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "Feature", + "doc": "a feature is a named numeric value", + "fields": [ + { + "name": "name", + "type": "string", + "doc": "name of the aggregation, e.g. jobDaily" + }, + { + "name": "term", + "type": "string", + "doc": "The specific subtype of the feature. If not null, this represents a hierarchy of features under the same name." + }, + { + "name": "value", + "type": "float", + "doc": "The value of the relevance feature." + } + ] + }, + "default": [] + } + }, + { + "name": "y", + "type": { + "type": "array", + "items": "string" + } + }, + { + "name": "timestamp", + "type": "string", + "doc": "The date when the features are aggregated from in format of yyyy-MM-dd(Pacific Time). It is also the end date of aggregation." + }, + { + "name": "aggregationWindow", + "type": [ + "null", + "int" + ], + "default": null, + "doc": "Length of days for the activity aggregation features. By default, it's daily aggregation." + } + ] + }, + "data": [ + { + "x": "a5", + "y": [ + "a1", + "a7" + ], + "features": [ + { + "name": "f1", + "term": "f1t1", + "value": 5.0 + }, + { + "name": "f1", + "term": "f1t2", + "value": 6.0 + }, + { + "name": "f2", + "term": "f2t1", + "value": 7.0 + } + ], + "timestamp": "2018-05-01", + "aggregationWindow": null + }, + { + "x": "a4", + "y": [ + "a1", + "a7" + ], + "features": [ + { + "name": "f1", + "term": "f1t1", + "value": 5.0 + }, + { + "name": "f1", + "term": "f1t2", + "value": 6.0 + }, + { + "name": "f2", + "term": "f2t1", + "value": 7.0 + } + ], + "timestamp": "2018-05-01", + "aggregationWindow": { + "int": 111 + } + }, + { + "x": "a3", + "y": [ + "a1", + "a7" + ], + "features": [ + { + "name": "f1", + "term": "f1t1", + "value": 5.0 + }, + { + "name": "f1", + "term": "f1t2", + "value": 6.0 + }, + { + "name": "f2", + "term": "f2t1", + "value": 7.0 + } + ], + "timestamp": "2018-05-01", + "aggregationWindow": { + "int": 333 + } + } + ] +} \ No newline at end of file From aac0fac7c520ee85113d789379d9fab9c9df59fb Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Sat, 28 Jan 2023 00:15:57 -0800 Subject: [PATCH 2/2] Add support for skipping features --- .../SequentialJoinAsDerivation.scala | 2 +- .../datasource/DataSourceNodeEvaluator.scala | 4 +- .../feathr/offline/job/FeatureGenJob.scala | 2 +- .../feathr/offline/job/FeatureJoinJob.scala | 2 +- .../offline/join/DataFrameFeatureJoiner.scala | 4 +- .../source/accessor/DataSourceAccessor.scala | 28 +++++--- ...hPartitionedTimeSeriesSourceAccessor.scala | 12 ++-- .../swa/SlidingWindowAggregationJoiner.scala | 23 ++++-- .../AnchorToDataSourceMapper.scala | 18 ++--- .../feathr/offline/util/FeathrUtils.scala | 4 +- .../offline/SlidingWindowAggIntegTest.scala | 71 +++++++++++++++++++ .../location/TestSparkSqlLocation.scala | 2 +- .../accessor/TestDataSourceAccessor.scala | 2 +- .../TestAnchorToDataSourceMapper.scala | 8 +-- .../feathr/offline/util/TestDataSource.scala | 4 +- 15 files changed, 141 insertions(+), 45 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala index 3be7b6d61..ddc5e0d19 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala @@ -219,7 +219,7 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession, val anchorDFMap1 = anchorToDataSourceMapper.getBasicAnchorDFMapForJoin(ss, Seq(featureAnchor), failOnMissingPartition) val featureInfo = FeatureTransformation.directCalculate( anchorGroup: AnchorFeatureGroups, - anchorDFMap1(featureAnchor), + anchorDFMap1(featureAnchor).get, featureAnchor.featureAnchor.sourceKeyExtractor, None, None, diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/evaluator/datasource/DataSourceNodeEvaluator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/evaluator/datasource/DataSourceNodeEvaluator.scala index 9b11444ed..55c230f52 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/evaluator/datasource/DataSourceNodeEvaluator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/evaluator/datasource/DataSourceNodeEvaluator.scala @@ -70,7 +70,7 @@ object DataSourceNodeEvaluator extends NodeEvaluator{ val timeStampExpr = constructTimeStampExpr(timeWindowParam.timestampColumn, timeWindowParam.timestampColumnFormat) val needTimestampColumn = if (dataSourceNode.hasTimestampColumnInfo) false else true val dataSourceAccessor = DataSourceAccessor(ss, source, timeRange, None, failOnMissingPartition = false, needTimestampColumn, dataPathHandlers = dataPathHandlers) - val sourceDF = dataSourceAccessor.get() + val sourceDF = dataSourceAccessor.get.get() val (df, keyExtractor, timestampExpr) = if (dataSourceNode.getKeyExpressionType == KeyExpressionType.UDF) { val className = Class.forName(dataSourceNode.getKeyExpression()) val keyExtractorClass = className.newInstance match { @@ -110,7 +110,7 @@ object DataSourceNodeEvaluator extends NodeEvaluator{ // Augment time information also here. Table node should not have time info? val dataSource = com.linkedin.feathr.offline.source.DataSource(path, SourceFormatType.FIXED_PATH) val dataSourceAccessor = DataSourceAccessor(ss, dataSource, None, None, failOnMissingPartition = false, dataPathHandlers = dataPathHandlers) - val sourceDF = dataSourceAccessor.get() + val sourceDF = dataSourceAccessor.get.get() val (df, keyExtractor) = if (dataSourceNode.getKeyExpressionType == KeyExpressionType.UDF) { val className = Class.forName(dataSourceNode.getKeyExpression()) className.newInstance match { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala index b9098a647..6980eeb62 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala @@ -238,7 +238,7 @@ object FeatureGenJob { // For example, f1, f2 belongs to anchor. Then Map("f1,f2"-> anchor) val dataFrameMapForPreprocessing = anchorsWithSource .filter(x => featureNamesInAnchorSet.contains(x._1.featureAnchor.features.toSeq.sorted.mkString(","))) - .map(x => (x._1.featureAnchor.features.toSeq.sorted.mkString(","), x._2.get())) + .map(x => (x._1.featureAnchor.features.toSeq.sorted.mkString(","), x._2.get.get())) // Pyspark only understand Java map so we need to convert Scala map back to Java map. dataFrameMapForPreprocessing.asJava diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala index 3f3f7be05..ef78da03b 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala @@ -384,7 +384,7 @@ object FeatureJoinJob { // For example, f1, f2 belongs to anchor. Then Map("f1,f2"-> anchor) val dataFrameMapForPreprocessing = anchorsWithSource .filter(x => featureNamesInAnchorSet.contains(x._1.featureAnchor.features.toSeq.sorted.mkString(","))) - .map(x => (x._1.featureAnchor.features.toSeq.sorted.mkString(","), x._2.get())) + .map(x => (x._1.featureAnchor.features.toSeq.sorted.mkString(","), x._2.get.get())) // Pyspark only understand Java map so we need to convert Scala map back to Java map. dataFrameMapForPreprocessing.asJava diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala index a03abc83c..098cbe7d6 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala @@ -190,6 +190,8 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d .map(featureGroups.allAnchoredFeatures), failOnMissingPartition) + val updatedAnchorSourceAccessorMap = anchorSourceAccessorMap.filter(x => x._2.isDefined).map(x => x._1 -> x._2.get) + implicit val joinExecutionContext: JoinExecutionContext = JoinExecutionContext(ss, logicalPlan, featureGroups, bloomFilters, Some(saltedJoinFrequentItemDFs)) // 3. Join sliding window aggregation features @@ -210,7 +212,7 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d SparkJoinWithJoinCondition(EqualityJoinConditionBuilder), mvelContext) } val FeatureDataFrameOutput(FeatureDataFrame(withAllBasicAnchoredFeatureDF, inferredBasicAnchoredFeatureTypes)) = - anchoredFeatureJoinStep.joinFeatures(requiredRegularFeatureAnchors, AnchorJoinStepInput(withWindowAggFeatureDF, anchorSourceAccessorMap)) + anchoredFeatureJoinStep.joinFeatures(requiredRegularFeatureAnchors, AnchorJoinStepInput(withWindowAggFeatureDF, updatedAnchorSourceAccessorMap)) // 5. If useSlickJoin, restore(join back) all observation fields before we evaluate post derived features, sequential join and passthrough // anchored features, as they might require other columns in the original observation data, while the current observation // dataset does not have these fields (were removed in the preProcessObservation) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/DataSourceAccessor.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/DataSourceAccessor.scala index e7cd08a42..7e44e7526 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/DataSourceAccessor.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/DataSourceAccessor.scala @@ -4,7 +4,7 @@ import com.linkedin.feathr.offline.source.dataloader.DataLoaderFactory import com.linkedin.feathr.offline.source.pathutil.{PathChecker, TimeBasedHdfsPathAnalyzer} import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler import com.linkedin.feathr.offline.source.{DataSource, SourceFormatType} -import com.linkedin.feathr.offline.util.PartitionLimiter +import com.linkedin.feathr.offline.util.{FeathrUtils, PartitionLimiter} import com.linkedin.feathr.offline.util.datetime.DateTimeInterval import org.apache.spark.sql.{DataFrame, SparkSession} @@ -48,21 +48,22 @@ private[offline] object DataSourceAccessor { failOnMissingPartition: Boolean, addTimestampColumn: Boolean = false, isStreaming: Boolean = false, - dataPathHandlers: List[DataPathHandler]): DataSourceAccessor = { //TODO: Add tests + dataPathHandlers: List[DataPathHandler]): Option[DataSourceAccessor] = { //TODO: Add tests val dataAccessorHandlers: List[DataAccessorHandler] = dataPathHandlers.map(_.dataAccessorHandler) val dataLoaderHandlers: List[DataLoaderHandler] = dataPathHandlers.map(_.dataLoaderHandler) val sourceType = source.sourceType val dataLoaderFactory = DataLoaderFactory(ss, isStreaming, dataLoaderHandlers) + val skipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean if (isStreaming) { - new StreamDataSourceAccessor(ss, dataLoaderFactory, source) + Some(new StreamDataSourceAccessor(ss, dataLoaderFactory, source)) } else if (dateIntervalOpt.isEmpty || sourceType == SourceFormatType.FIXED_PATH || sourceType == SourceFormatType.LIST_PATH) { // if no input interval, or the path is fixed or list, load whole dataset - new NonTimeBasedDataSourceAccessor(ss, dataLoaderFactory, source, expectDatumType) + Some(new NonTimeBasedDataSourceAccessor(ss, dataLoaderFactory, source, expectDatumType)) } else { import scala.util.control.Breaks._ - + val timeInterval = dateIntervalOpt.get var dataAccessorOpt: Option[DataSourceAccessor] = None breakable { @@ -74,8 +75,12 @@ private[offline] object DataSourceAccessor { } } val dataAccessor = dataAccessorOpt match { - case Some(dataAccessor) => dataAccessor - case _ => createFromHdfsPath(ss, source, timeInterval, expectDatumType, failOnMissingPartition, addTimestampColumn, dataLoaderHandlers) + case Some(dataAccessor) => dataAccessorOpt + case _ => try { + Some(createFromHdfsPath(ss, source, timeInterval, expectDatumType, failOnMissingPartition, addTimestampColumn, dataLoaderHandlers)) + } catch { + case e: Exception => if (!skipFeature) throw e else None + } } dataAccessor } @@ -106,6 +111,7 @@ private[offline] object DataSourceAccessor { val partitionLimiter = new PartitionLimiter(ss) val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(pathChecker, dataLoaderHandlers) val fileName = new File(source.path).getName + val skipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.ENABLE_SALTED_JOIN).toBoolean if (source.timePartitionPattern.isDefined) { // case 1: the timePartitionPattern exists val pathInfo = pathAnalyzer.analyze(source.path, source.timePartitionPattern.get) @@ -117,7 +123,8 @@ private[offline] object DataSourceAccessor { source, timeInterval, failOnMissingPartition, - addTimestampColumn) + addTimestampColumn, + skipFeature) } else { // legacy configurations without timePartitionPattern if (fileName.endsWith("daily") || fileName.endsWith("hourly") || source.sourceType == SourceFormatType.TIME_PATH) { @@ -131,7 +138,8 @@ private[offline] object DataSourceAccessor { source, timeInterval, failOnMissingPartition, - addTimestampColumn) + addTimestampColumn, + skipFeature) } else { // case 3: load as whole dataset new NonTimeBasedDataSourceAccessor(ss, fileLoaderFactory, source, expectDatumType) @@ -147,7 +155,7 @@ private[offline] object DataSourceAccessor { */ private[offline] case class DataAccessorHandler( validatePath: String => Boolean, - getAccessor: + getAccessor: ( SparkSession, DataSource, diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala index 9948d42c9..0972e7120 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala @@ -7,7 +7,7 @@ import com.linkedin.feathr.offline.source.dataloader.DataLoaderFactory import com.linkedin.feathr.offline.source.pathutil.{PathChecker, PathInfo, TimeBasedHdfsPathGenerator} import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils import com.linkedin.feathr.offline.transformation.DataFrameExt._ -import com.linkedin.feathr.offline.util.PartitionLimiter +import com.linkedin.feathr.offline.util.{FeathrUtils, PartitionLimiter} import com.linkedin.feathr.offline.util.datetime.{DateTimeInterval, OfflineDateTimeUtils} import org.apache.log4j.Logger import org.apache.spark.sql.DataFrame @@ -122,6 +122,7 @@ private[offline] object PathPartitionedTimeSeriesSourceAccessor { * @param timeInterval timespan of dataset * @param failOnMissingPartition whether to fail the file loading if some of the date partitions are missing. * @param addTimestampColumn whether to create a timestamp column from the time partition of the source. + * @param skipFeature if feature data is not present, boolean var to see if this feature should be skipped. * @return a TimeSeriesSource */ def apply( @@ -132,23 +133,25 @@ private[offline] object PathPartitionedTimeSeriesSourceAccessor { source: DataSource, timeInterval: DateTimeInterval, failOnMissingPartition: Boolean, - addTimestampColumn: Boolean): DataSourceAccessor = { + addTimestampColumn: Boolean, + skipFeature: Boolean): DataSourceAccessor = { val pathGenerator = new TimeBasedHdfsPathGenerator(pathChecker) val dateTimeResolution = pathInfo.dateTimeResolution val postPath = source.postPath val postfixPath = if(postPath.isEmpty || postPath.startsWith("/")) postPath else "/" + postPath val pathList = pathGenerator.generate(pathInfo, timeInterval, !failOnMissingPartition, postfixPath) val timeFormatString = pathInfo.datePathPattern - val dataframes = pathList.map(path => { val timeStr = path.substring(path.length - (timeFormatString.length + postfixPath.length), path.length - postfixPath.length) val time = OfflineDateTimeUtils.createTimeFromString(timeStr, timeFormatString) val interval = DateTimeInterval.createFromInclusive(time, time, dateTimeResolution) + val df = fileLoaderFactory.create(path).loadDataFrame() (df, interval) }) - if (dataframes.isEmpty) { + + if (dataframes.isEmpty && !skipFeature) { val errMsg = s"Input data is empty for creating TimeSeriesSource. No available " + s"date partition exist in HDFS for path ${pathInfo.basePath} between ${timeInterval.getStart} and ${timeInterval.getEnd} " val errMsgPf = errMsg + s"with postfix path ${postfixPath}" @@ -160,6 +163,7 @@ private[offline] object PathPartitionedTimeSeriesSourceAccessor { ErrorLabel.FEATHR_USER_ERROR, errMsgPf) } } + val datePartitions = dataframes.map { case (df, interval) => DatePartition(df, interval) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala index cccdfcd43..b456d281f 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala @@ -115,6 +115,8 @@ private[offline] class SlidingWindowAggregationJoiner( case (source, grouped) => (source, grouped.map(_._2)) }) + val notJoinedFeatures = new mutable.HashSet[String]() + // For each source, we calculate the maximum window duration that needs to be loaded across all // required SWA features defined on this source. // Then we load the source only once. @@ -140,10 +142,13 @@ private[offline] class SlidingWindowAggregationJoiner( maxDurationPerSource, featuresToDelayImmutableMap.values.toArray, failOnMissingPartition) - + if (originalSourceDf.isEmpty) { + res.map(notJoinedFeatures.add) + anchors.map(anchor => (anchor, originalSourceDf)) + } else { val sourceDF: DataFrame = preprocessedDf match { case Some(existDf) => existDf - case None => originalSourceDf + case None => originalSourceDf.get } // all the anchors here have same key sourcekey extractor, so we just use the first one to generate key column and share @@ -155,13 +160,17 @@ private[offline] class SlidingWindowAggregationJoiner( case keyExtractor => keyExtractor.appendKeyColumns(sourceDF) } - anchors.map(anchor => (anchor, withKeyDF)) - }) + anchors.map(anchor => (anchor, Some(withKeyDF))) + }} + ) + + val updatedWindowAggAnchorDFMap = windowAggAnchorDFMap.filter(x => x._2.isDefined).map(x => x._1 ->x._2.get) val allInferredFeatureTypes = mutable.Map.empty[String, FeatureTypeConfig] windowAggFeatureStages.foreach({ case (keyTags: Seq[Int], featureNames: Seq[String]) => + if (!featureNames.diff(notJoinedFeatures.toSeq).isEmpty) { val stringKeyTags = keyTags.map(keyTagList).map(k => s"CAST (${k} AS string)") // restore keyTag to column names in join config // get the bloom filter for the key combinations in this stage @@ -188,10 +197,10 @@ private[offline] class SlidingWindowAggregationJoiner( s"${labelDataDef.dataSource.collect().take(3).map(_.toString()).mkString("\n ")}") } val windowAggAnchorsThisStage = featureNames.map(allWindowAggFeatures) - val windowAggAnchorDFThisStage = windowAggAnchorDFMap.filterKeys(windowAggAnchorsThisStage.toSet) + val windowAggAnchorDFThisStage = updatedWindowAggAnchorDFMap.filterKeys(windowAggAnchorsThisStage.toSet) val factDataDefs = - SlidingWindowFeatureUtils.getSWAAnchorGroups(windowAggAnchorDFThisStage).map { + SlidingWindowFeatureUtils.getSWAAnchorGroups(updatedWindowAggAnchorDFMap).map { anchorWithSourceToDFMap => val selectedFeatures = anchorWithSourceToDFMap.keySet.flatMap(_.selectedFeatures).filter(featureNames.contains(_)) val factData = anchorWithSourceToDFMap.head._2 @@ -244,7 +253,7 @@ private[offline] class SlidingWindowAggregationJoiner( s"${factDataDef.dataSource.collect().take(3).map(_.toString()).mkString("\n ")}") } } - }) + }}) offline.FeatureDataFrame(contextDF, allInferredFeatureTypes.toMap) } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index 4a06bb2d9..5ad1b0483 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -34,7 +34,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH def getBasicAnchorDFMapForJoin( ss: SparkSession, requiredFeatureAnchors: Seq[FeatureAnchorWithSource], - failOnMissingPartition: Boolean): Map[FeatureAnchorWithSource, DataSourceAccessor] = { + failOnMissingPartition: Boolean): Map[FeatureAnchorWithSource, Option[DataSourceAccessor]] = { // get a Map from each source to a list of all anchors based on this source val sourceToAnchor = requiredFeatureAnchors .map(anchor => (anchor.source, anchor)) @@ -63,12 +63,12 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH } } val timeSeriesSource = DataSourceAccessor(ss = ss, - source = source, - dateIntervalOpt = dateInterval, - expectDatumType = Some(expectDatumType), + source = source, + dateIntervalOpt = dateInterval, + expectDatumType = Some(expectDatumType), failOnMissingPartition = failOnMissingPartition, dataPathHandlers = dataPathHandlers) - + anchorsWithDate.map(anchor => (anchor, timeSeriesSource)) }) } @@ -91,7 +91,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH obsTimeRange: DateTimeInterval, window: Duration, timeDelays: Array[Duration], - failOnMissingPartition: Boolean): DataFrame = { + failOnMissingPartition: Boolean): Option[DataFrame] = { val dataLoaderHandlers: List[DataLoaderHandler] = dataPathHandlers.map(_.dataLoaderHandler) @@ -119,7 +119,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH failOnMissingPartition = failOnMissingPartition, addTimestampColumn = needCreateTimestampColumn, dataPathHandlers = dataPathHandlers) - timeSeriesSource.get() + if (timeSeriesSource.isDefined) Some(timeSeriesSource.get.get()) else None } /** @@ -171,8 +171,8 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH addTimestampColumn = needCreateTimestampColumn, isStreaming = isStreaming, dataPathHandlers = dataPathHandlers) - - anchors.map(anchor => (anchor, timeSeriesSource)) + + anchors.map(anchor => (anchor, timeSeriesSource.get)) }) } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala index c0cdc2d05..ea639145a 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala @@ -27,6 +27,7 @@ private[offline] object FeathrUtils { */ val SEQ_JOIN_ARRAY_EXPLODE_ENABLED = "seq.join.array.explode.enabled" val ENABLE_SALTED_JOIN = "enable.salted.join" + val SKIP_MISSING_FEATURE = "skip.missing.feature" val SALTED_JOIN_FREQ_ITEM_THRESHOLD = "salted.join.freq.item.threshold" val SALTED_JOIN_FREQ_ITEM_ESTIMATOR = "salted.join.freq.item.estimator" val SALTED_JOIN_PERSIST = "salted.join.persist" @@ -45,9 +46,10 @@ private[offline] object FeathrUtils { CHECKPOINT_OUTPUT_PATH -> "/tmp/feathr/checkpoints", ENABLE_CHECKPOINT -> "false", DEBUG_OUTPUT_PART_NUM -> "200", - FAIL_ON_MISSING_PARTITION -> "false", + FAIL_ON_MISSING_PARTITION -> "true", SEQ_JOIN_ARRAY_EXPLODE_ENABLED -> "true", ENABLE_SALTED_JOIN -> "false", + SKIP_MISSING_FEATURE -> "true", // If one key appears more than 0.02% in the dataset, we will salt this join key and split them into multiple partitions // This is an empirical value SALTED_JOIN_FREQ_ITEM_THRESHOLD -> "0.0002", diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index 965052432..e4a78af14 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -315,6 +315,77 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { assertEquals(df.getAs[Float]("simpleFeature"), 20f) } + /** + * SWA test with default values + */ + @Test + def testSWAWithMissingFeatureData(): Unit = { + val joinConfigAsString = + """ + | settings: { + | observationDataTimeSettings: { + | absoluteTimeRange: { + | timeFormat: yyyy-MM-dd + | startTime: "2018-05-01" + | endTime: "2018-05-03" + | } + | } + | joinTimeSettings: { + | timestampColumn: { + | def: timestamp + | format: yyyy-MM-dd + | } + | } + |} + | + |features: [ + | { + | key: [x], + | featureList: ["simplePageViewCount", "simpleFeature"] + | } + |] + """.stripMargin + val featureDefAsString = + """ + |sources: { + | swaSource: { + | location: { path: "slidingWindowAgg/missingFeatureData/daily" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + |} + | + |anchors: { + | swaAnchor: { + | source: "swaSource" + | key: "x" + | features: { + | simplePageViewCount: { + | def: "aggregationWindow" + | aggregation: COUNT + | window: 3d + | default: 10 + | } + | simpleFeature: { + | def: "aggregationWindow" + | aggregation: COUNT + | window: 3d + | default: 20 + | } + | } + | } + |} + """.stripMargin + val res = runLocalFeatureJoinForTest(joinConfigAsString, featureDefAsString, observationDataPath = "slidingWindowAgg/localAnchorTestObsData.avro.json").data + res.show() + val df = res.collect()(0) + assertEquals(df.getAs[Float]("simplePageViewCount"), 10f) + assertEquals(df.getAs[Float]("simpleFeature"), 20f) + } + /** * test SWA with simulate time expressed as days and hours */ diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/config/location/TestSparkSqlLocation.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/config/location/TestSparkSqlLocation.scala index 35e1d876f..7a72f3deb 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/config/location/TestSparkSqlLocation.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/config/location/TestSparkSqlLocation.scala @@ -32,7 +32,7 @@ class TestSparkSqlLocation extends TestFeathr{ val loc = SparkSqlLocation(sql = Some("select * from test_spark_sql_table order by users asc")) val dataSource = DataSource(loc, SourceFormatType.FIXED_PATH) val accessor = DataSourceAccessor(ss, dataSource, None, None, failOnMissingPartition = false, dataPathHandlers=List()) - val df = accessor.get() + val df = accessor.get.get() val expectedRows = Array( Row("Scala", 3000), Row("Java", 20000), diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/accessor/TestDataSourceAccessor.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/accessor/TestDataSourceAccessor.scala index d0e28d1cc..f41793787 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/accessor/TestDataSourceAccessor.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/accessor/TestDataSourceAccessor.scala @@ -79,6 +79,6 @@ class TestDataSourceAccessor extends TestFeathr { val accessor = DataSourceAccessor(ss=ss, source=source, dateIntervalOpt=sourceInterval, expectDatumType=None, failOnMissingPartition = false, dataPathHandlers=List()) assertTrue(accessor.isInstanceOf[NonTimeBasedDataSourceAccessor]) - assertEquals(accessor.get().count(), 10L) + assertEquals(accessor.get.get().count(), 10L) } } diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/transformation/TestAnchorToDataSourceMapper.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/transformation/TestAnchorToDataSourceMapper.scala index 1f0e41887..58986b683 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/transformation/TestAnchorToDataSourceMapper.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/transformation/TestAnchorToDataSourceMapper.scala @@ -139,7 +139,7 @@ class TestAnchorToDataSourceMapper extends TestFeathr with MockitoSugar { false) // for systematic comparison, sort both original and loaded data by (timestamp, mId, value) - val anchorData = anchorDF.rdd.collect().toSeq.sortBy(row => (row.getLong(1), row.getLong(0), row.getFloat(2))) // sort by timestamp then by mId + val anchorData = anchorDF.get.rdd.collect().toSeq.sortBy(row => (row.getLong(1), row.getLong(0), row.getFloat(2))) // sort by timestamp then by mId val sortedRecords = records2018011002.sortBy(r => (r.get("timestamp").asInstanceOf[Long], r.get("mId").asInstanceOf[Long], r.get("value").asInstanceOf[Float])) assertEquals(anchorData.size, 2) @@ -158,7 +158,7 @@ class TestAnchorToDataSourceMapper extends TestFeathr with MockitoSugar { simulateTimeDelays, false) - val anchorData2 = anchorDF2.rdd.collect().toSeq.sortBy(row => (row.getLong(1), row.getLong(0), row.getFloat(2))) + val anchorData2 = anchorDF2.get.rdd.collect().toSeq.sortBy(row => (row.getLong(1), row.getLong(0), row.getFloat(2))) val sortedRecords2 = (records2018011002 ++ records2018011003).sortBy(r => (r.get("timestamp").asInstanceOf[Long], r.get("mId").asInstanceOf[Long], r.get("value").asInstanceOf[Float])) assertEquals(anchorData2.size, 4) @@ -207,7 +207,7 @@ class TestAnchorToDataSourceMapper extends TestFeathr with MockitoSugar { simulateTimeDelays, failOnMissingPartition = false) - val anchorData = anchorDF.rdd.collect().toSeq.sortBy(row => (row.getLong(1), row.getLong(0), row.getFloat(2))) // sort by timestamp then by mId + val anchorData = anchorDF.get.rdd.collect().toSeq.sortBy(row => (row.getLong(1), row.getLong(0), row.getFloat(2))) // sort by timestamp then by mId val sortedRecords = (records20180110 ++ records20180111).sortBy(r => (r.get("timestamp").asInstanceOf[Long], r.get("mId").asInstanceOf[Long], r.get("value").asInstanceOf[Float])) assertEquals(anchorData.size, 4) @@ -228,7 +228,7 @@ class TestAnchorToDataSourceMapper extends TestFeathr with MockitoSugar { simulateTimeDelays, failOnMissingPartition = false) - val anchorData2 = anchorDF2.rdd.collect().toSeq.sortBy(row => (row.getLong(1), row.getLong(0), row.getFloat(2))) + val anchorData2 = anchorDF2.get.rdd.collect().toSeq.sortBy(row => (row.getLong(1), row.getLong(0), row.getFloat(2))) val sortedRecords2 = (records20180110 ++ records20180111).sortBy(r => (r.get("timestamp").asInstanceOf[Long], r.get("mId").asInstanceOf[Long], r.get("value").asInstanceOf[Float])) assertEquals(anchorData2.size, 4) diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/util/TestDataSource.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/util/TestDataSource.scala index 1403047da..b888feca9 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/util/TestDataSource.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/util/TestDataSource.scala @@ -46,7 +46,7 @@ class TestDataSource extends TestFeathr { createIntervalFromLocalTime(date2018011001.plusMinutes(30L), date2018011002.plusMinutes(59L)), Duration.ofDays(1), simulateTimeDelays) - val anchorDF1 = DataSourceAccessor(ss=ss, source=mockSource, dateIntervalOpt=Some(timeRange1), expectDatumType=None, failOnMissingPartition = false, dataPathHandlers=List()).get() + val anchorDF1 = DataSourceAccessor(ss=ss, source=mockSource, dateIntervalOpt=Some(timeRange1), expectDatumType=None, failOnMissingPartition = false, dataPathHandlers=List()).get.get() assertEquals(anchorDF1.count(), 8) // test: get 1 day of data @@ -56,7 +56,7 @@ class TestDataSource extends TestFeathr { createIntervalFromLocalTime(date2018011003.plusMinutes(30L), date2018011004.plusMinutes(59L)), Duration.ofDays(3), simulateTimeDelays) - val anchorDF2 = DataSourceAccessor(ss=ss, source=mockSource, dateIntervalOpt=Some(timeRange2), expectDatumType=None, failOnMissingPartition = false, dataPathHandlers=List()).get() + val anchorDF2 = DataSourceAccessor(ss=ss, source=mockSource, dateIntervalOpt=Some(timeRange2), expectDatumType=None, failOnMissingPartition = false, dataPathHandlers=List()).get.get() assertEquals(anchorDF2.count(), 13) }