Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
be7ada4
Extract Spark Plan product with keys for Scala 2.13
charlesmyu Oct 10, 2025
9c19e11
Extract Spark Plan product values for Spark 2.12
charlesmyu Oct 10, 2025
3f9c26b
Update tests for meta field in Spark SQL plans
charlesmyu Oct 14, 2025
94f3139
Remove unused logic to parse children, enrich product parsing to supp…
charlesmyu Oct 15, 2025
beb4d5f
Update tests to assert on meta values
charlesmyu Oct 16, 2025
750c68f
Use Abstract class for common functions
charlesmyu Oct 16, 2025
0279fff
Use Jackson JSON parser instead of rolling own parsing
charlesmyu Oct 16, 2025
50fa41a
Refactor AbstractSparkPlanUtils to only require key generation on impl
charlesmyu Oct 17, 2025
51835fa
Default to returning null if class not recognized, limit recursion de…
charlesmyu Oct 20, 2025
c68b356
Improve testing scheme for Spark32 on Scala 212 with unknown keys
charlesmyu Oct 20, 2025
8bf8488
Improve method & class naming, reuse ObjectMapper from listener
charlesmyu Oct 21, 2025
55b917b
Gate Spark Plan parsing with flag
charlesmyu Oct 21, 2025
047880a
Match classes by string comparison, add negative cache
charlesmyu Oct 21, 2025
3619b77
Add unit tests for AbstractSparkPlanSerializer
charlesmyu Oct 23, 2025
53918a3
Make ObjectMapper protected on AbstractDatadogSparkListener instead o…
charlesmyu Oct 23, 2025
6e68c69
Specify correct helper class names
charlesmyu Oct 23, 2025
5483ba8
Add dd.data.jobs.experimental_features.enabled FF
charlesmyu Oct 23, 2025
e4973fc
Remove knownMatchingTypes override from version-specific impls
charlesmyu Oct 24, 2025
5527ad0
Catch NullPointerException for getDeclaredMethod calls
charlesmyu Oct 24, 2025
1f31add
Adjust more gates to match classes using string comparison
charlesmyu Oct 24, 2025
18e51d5
Revert "Catch NullPointerException for getDeclaredMethod calls"
charlesmyu Oct 24, 2025
de336b9
Explicit cast to String on simpleString calls
charlesmyu Oct 27, 2025
160558e
Use toMap to convert mutable to immutable map in Scala 2.12
charlesmyu Oct 27, 2025
d4c8264
Improvements from comments: use singleton, string concat instead of f…
charlesmyu Oct 27, 2025
ef18062
Avoid merge conflict, reorder flags
charlesmyu Oct 27, 2025
34528dc
Exit loop early, store reflected methods as class fields
charlesmyu Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
import datadog.trace.api.Config;
import net.bytebuddy.asm.Advice;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Predef;
import scala.collection.JavaConverters;

@AutoService(InstrumenterModule.class)
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AbstractDatadogSparkListener",
packageName + ".AbstractSparkPlanSerializer",
packageName + ".DatabricksParentContext",
packageName + ".OpenlineageParentContext",
packageName + ".DatadogSpark212Listener",
Expand All @@ -27,6 +32,7 @@ public String[] helperClassNames() {
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
packageName + ".Spark212PlanSerializer"
};
}

Expand All @@ -40,6 +46,13 @@ public void methodAdvice(MethodTransformer transformer) {
.and(isDeclaredBy(named("org.apache.spark.SparkContext")))
.and(takesNoArguments()),
Spark212Instrumentation.class.getName() + "$InjectListener");

transformer.applyAdvice(
isMethod()
.and(named("fromSparkPlan"))
.and(takesArgument(0, named("org.apache.spark.sql.execution.SparkPlan")))
.and(isDeclaredBy(named("org.apache.spark.sql.execution.SparkPlanInfo$"))),
Spark212Instrumentation.class.getName() + "$SparkPlanInfoAdvice");
}

public static class InjectListener {
Expand Down Expand Up @@ -78,4 +91,25 @@ public static void enter(@Advice.This SparkContext sparkContext) {
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
}
}

public static class SparkPlanInfoAdvice {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void exit(
@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
@Advice.Argument(0) SparkPlan plan) {
if (planInfo.metadata().size() == 0
&& (Config.get().isDataJobsParseSparkPlanEnabled()
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
Spark212PlanSerializer planUtils = new Spark212PlanSerializer();
planInfo =
new SparkPlanInfo(
planInfo.nodeName(),
planInfo.simpleString(),
planInfo.children(),
JavaConverters.mapAsScalaMap(planUtils.extractFormattedProduct(plan))
.toMap(Predef.$conforms()),
planInfo.metrics());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package datadog.trace.instrumentation.spark;

import org.apache.spark.sql.catalyst.trees.TreeNode;

public class Spark212PlanSerializer extends AbstractSparkPlanSerializer {
@Override
public String getKey(int idx, TreeNode node) {
return "_dd.unknown_key." + idx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
import datadog.trace.api.Config;
import net.bytebuddy.asm.Advice;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.immutable.HashMap;

@AutoService(InstrumenterModule.class)
public class Spark213Instrumentation extends AbstractSparkInstrumentation {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AbstractDatadogSparkListener",
packageName + ".AbstractSparkPlanSerializer",
packageName + ".DatabricksParentContext",
packageName + ".OpenlineageParentContext",
packageName + ".DatadogSpark213Listener",
Expand All @@ -27,6 +32,7 @@ public String[] helperClassNames() {
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
packageName + ".Spark213PlanSerializer"
};
}

Expand All @@ -40,6 +46,13 @@ public void methodAdvice(MethodTransformer transformer) {
.and(isDeclaredBy(named("org.apache.spark.SparkContext")))
.and(takesNoArguments()),
Spark213Instrumentation.class.getName() + "$InjectListener");

transformer.applyAdvice(
isMethod()
.and(named("fromSparkPlan"))
.and(takesArgument(0, named("org.apache.spark.sql.execution.SparkPlan")))
.and(isDeclaredBy(named("org.apache.spark.sql.execution.SparkPlanInfo$"))),
Spark213Instrumentation.class.getName() + "$SparkPlanInfoAdvice");
}

public static class InjectListener {
Expand Down Expand Up @@ -79,4 +92,24 @@ public static void enter(@Advice.This SparkContext sparkContext) {
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
}
}

public static class SparkPlanInfoAdvice {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void exit(
@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
@Advice.Argument(0) SparkPlan plan) {
if (planInfo.metadata().size() == 0
&& (Config.get().isDataJobsParseSparkPlanEnabled()
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
Spark213PlanSerializer planUtils = new Spark213PlanSerializer();
planInfo =
new SparkPlanInfo(
planInfo.nodeName(),
planInfo.simpleString(),
planInfo.children(),
HashMap.from(JavaConverters.asScala(planUtils.extractFormattedProduct(plan))),
planInfo.metrics());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package datadog.trace.instrumentation.spark;

import org.apache.spark.sql.catalyst.trees.TreeNode;

public class Spark213PlanSerializer extends AbstractSparkPlanSerializer {
@Override
public String getKey(int idx, TreeNode node) {
return node.productElementName(idx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
*/
public abstract class AbstractDatadogSparkListener extends SparkListener {
private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
protected static final ObjectMapper objectMapper = new ObjectMapper();
public static volatile AbstractDatadogSparkListener listener = null;

public static volatile boolean finishTraceOnApplicationEnd = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public String[] knownMatchingTypes() {
"org.apache.spark.deploy.yarn.ApplicationMaster",
"org.apache.spark.util.Utils",
"org.apache.spark.util.SparkClassUtils",
"org.apache.spark.scheduler.LiveListenerBus"
"org.apache.spark.scheduler.LiveListenerBus",
"org.apache.spark.sql.execution.SparkPlanInfo$"
};
}

Expand Down
Loading