Skip to content

Commit e6d0da9

Browse files
Extract Vert.x json body response schemas
1 parent ad782b6 commit e6d0da9

File tree

15 files changed

+313
-3
lines changed

15 files changed

+313
-3
lines changed

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public class AppSecRequestContext implements DataBundle, Closeable {
105105
private boolean reqDataPublished;
106106
private boolean rawReqBodyPublished;
107107
private boolean convertedReqBodyPublished;
108+
private boolean responseBodyPublished;
108109
private boolean respDataPublished;
109110
private boolean pathParamsPublished;
110111
private volatile Map<String, String> derivatives;
@@ -502,6 +503,14 @@ public void setConvertedReqBodyPublished(boolean convertedReqBodyPublished) {
502503
this.convertedReqBodyPublished = convertedReqBodyPublished;
503504
}
504505

506+
public boolean isResponseBodyPublished() {
507+
return responseBodyPublished;
508+
}
509+
510+
public void setResponseBodyPublished(final boolean responseBodyPublished) {
511+
this.responseBodyPublished = responseBodyPublished;
512+
}
513+
505514
public boolean isRespDataPublished() {
506515
return respDataPublished;
507516
}

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public class GatewayBridge {
9696
private volatile DataSubscriberInfo initialReqDataSubInfo;
9797
private volatile DataSubscriberInfo rawRequestBodySubInfo;
9898
private volatile DataSubscriberInfo requestBodySubInfo;
99+
private volatile DataSubscriberInfo responseBodySubInfo;
99100
private volatile DataSubscriberInfo pathParamsSubInfo;
100101
private volatile DataSubscriberInfo respDataSubInfo;
101102
private volatile DataSubscriberInfo grpcServerMethodSubInfo;
@@ -135,6 +136,7 @@ public void init() {
135136
subscriptionService.registerCallback(EVENTS.requestMethodUriRaw(), this::onRequestMethodUriRaw);
136137
subscriptionService.registerCallback(EVENTS.requestBodyStart(), this::onRequestBodyStart);
137138
subscriptionService.registerCallback(EVENTS.requestBodyDone(), this::onRequestBodyDone);
139+
subscriptionService.registerCallback(EVENTS.responseBody(), this::onResponseBody);
138140
subscriptionService.registerCallback(
139141
EVENTS.requestClientSocketAddress(), this::onRequestClientSocketAddress);
140142
subscriptionService.registerCallback(
@@ -175,6 +177,7 @@ public void reset() {
175177
initialReqDataSubInfo = null;
176178
rawRequestBodySubInfo = null;
177179
requestBodySubInfo = null;
180+
responseBodySubInfo = null;
178181
pathParamsSubInfo = null;
179182
respDataSubInfo = null;
180183
grpcServerMethodSubInfo = null;
@@ -636,6 +639,40 @@ private Flow<Void> onRequestBodyDone(RequestContext ctx_, StoredBodySupplier sup
636639
}
637640
}
638641

642+
private Flow<Void> onResponseBody(RequestContext ctx_, Object obj) {
643+
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
644+
if (ctx == null) {
645+
return NoopFlow.INSTANCE;
646+
}
647+
648+
if (ctx.isResponseBodyPublished()) {
649+
log.debug(
650+
"Response body already published; will ignore new value of type {}", obj.getClass());
651+
return NoopFlow.INSTANCE;
652+
}
653+
ctx.setResponseBodyPublished(true);
654+
655+
while (true) {
656+
DataSubscriberInfo subInfo = responseBodySubInfo;
657+
if (subInfo == null) {
658+
subInfo = producerService.getDataSubscribers(KnownAddresses.RESPONSE_BODY_OBJECT);
659+
responseBodySubInfo = subInfo;
660+
}
661+
if (subInfo == null || subInfo.isEmpty()) {
662+
return NoopFlow.INSTANCE;
663+
}
664+
// TODO: review schema extraction limits
665+
Object converted = ObjectIntrospection.convert(obj, ctx);
666+
DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.RESPONSE_BODY_OBJECT, converted);
667+
try {
668+
GatewayContext gwCtx = new GatewayContext(false);
669+
return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
670+
} catch (ExpiredSubscriberInfoException e) {
671+
responseBodySubInfo = null;
672+
}
673+
}
674+
}
675+
639676
private Flow<Void> onRequestPathParams(RequestContext ctx_, Map<String, ?> data) {
640677
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
641678
if (ctx == null || ctx.isPathParamsPublished()) {

dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/GatewayBridgeSpecification.groovy

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ class GatewayBridgeSpecification extends DDSpecification {
9999
BiFunction<RequestContext, StoredBodySupplier, Void> requestBodyStartCB
100100
BiFunction<RequestContext, StoredBodySupplier, Flow<Void>> requestBodyDoneCB
101101
BiFunction<RequestContext, Object, Flow<Void>> requestBodyProcessedCB
102+
BiFunction<RequestContext, Object, Flow<Void>> responseBodyCB
102103
BiFunction<RequestContext, Integer, Flow<Void>> responseStartedCB
103104
TriConsumer<RequestContext, String, String> respHeaderCB
104105
Function<RequestContext, Flow<Void>> respHeadersDoneCB
@@ -450,6 +451,7 @@ class GatewayBridgeSpecification extends DDSpecification {
450451
1 * ig.registerCallback(EVENTS.requestBodyStart(), _) >> { requestBodyStartCB = it[1]; null }
451452
1 * ig.registerCallback(EVENTS.requestBodyDone(), _) >> { requestBodyDoneCB = it[1]; null }
452453
1 * ig.registerCallback(EVENTS.requestBodyProcessed(), _) >> { requestBodyProcessedCB = it[1]; null }
454+
1 * ig.registerCallback(EVENTS.responseBody(), _) >> { responseBodyCB = it[1]; null }
453455
1 * ig.registerCallback(EVENTS.responseStarted(), _) >> { responseStartedCB = it[1]; null }
454456
1 * ig.registerCallback(EVENTS.responseHeader(), _) >> { respHeaderCB = it[1]; null }
455457
1 * ig.registerCallback(EVENTS.responseHeaderDone(), _) >> { respHeadersDoneCB = it[1]; null }
@@ -1327,4 +1329,17 @@ class GatewayBridgeSpecification extends DDSpecification {
13271329
arCtx.getRoute() == route
13281330
}
13291331
1332+
void 'test on response body callback'() {
1333+
when:
1334+
responseBodyCB.apply(ctx, [test: 'this is a test'])
1335+
1336+
then:
1337+
1 * eventDispatcher.getDataSubscribers(KnownAddresses.RESPONSE_BODY_OBJECT) >> nonEmptyDsInfo
1338+
1 * eventDispatcher.publishDataEvent(_, _, _, _) >> {
1339+
final bundle = it[2] as DataBundle
1340+
final body = bundle.get(KnownAddresses.RESPONSE_BODY_OBJECT)
1341+
assert body['test'] == 'this is a test'
1342+
}
1343+
}
1344+
13301345
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package datadog.trace.instrumentation.vertx_4_0.server;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
5+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
6+
7+
import com.google.auto.service.AutoService;
8+
import datadog.trace.agent.tooling.Instrumenter;
9+
import datadog.trace.agent.tooling.InstrumenterModule;
10+
import datadog.trace.agent.tooling.muzzle.Reference;
11+
import io.vertx.ext.web.impl.RoutingContextImpl;
12+
13+
/**
14+
* @see RoutingContextImpl#getBodyAsJson(int)
15+
* @see RoutingContextImpl#getBodyAsJsonArray(int)
16+
*/
17+
@AutoService(InstrumenterModule.class)
18+
public class RoutingContextInstrumentation extends InstrumenterModule.AppSec
19+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
20+
21+
public RoutingContextInstrumentation() {
22+
super("vertx", "vertx-4.0");
23+
}
24+
25+
@Override
26+
public Reference[] additionalMuzzleReferences() {
27+
return new Reference[] {VertxVersionMatcher.HTTP_1X_SERVER_RESPONSE};
28+
}
29+
30+
@Override
31+
public String instrumentedType() {
32+
return "io.vertx.ext.web.RoutingContext";
33+
}
34+
35+
@Override
36+
public void methodAdvice(MethodTransformer transformer) {
37+
transformer.applyAdvice(
38+
named("json").and(takesArguments(1)).and(takesArgument(0, Object.class)),
39+
packageName + ".RoutingContextJsonResponseAdvice");
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package datadog.trace.instrumentation.vertx_4_0.server;
2+
3+
import static datadog.trace.api.gateway.Events.EVENTS;
4+
5+
import datadog.appsec.api.blocking.BlockingException;
6+
import datadog.trace.advice.ActiveRequestContext;
7+
import datadog.trace.advice.RequiresRequestContext;
8+
import datadog.trace.api.gateway.BlockResponseFunction;
9+
import datadog.trace.api.gateway.CallbackProvider;
10+
import datadog.trace.api.gateway.Flow;
11+
import datadog.trace.api.gateway.RequestContext;
12+
import datadog.trace.api.gateway.RequestContextSlot;
13+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
14+
import io.vertx.core.json.JsonObject;
15+
import java.util.function.BiFunction;
16+
import net.bytebuddy.asm.Advice;
17+
18+
@RequiresRequestContext(RequestContextSlot.APPSEC)
19+
class RoutingContextJsonResponseAdvice {
20+
21+
@Advice.OnMethodEnter(suppress = Throwable.class)
22+
static void before(
23+
@Advice.Argument(0) Object source, @ActiveRequestContext RequestContext reqCtx) {
24+
25+
if (source == null) {
26+
return;
27+
}
28+
29+
Object object = source;
30+
if (object instanceof JsonObject) {
31+
object = ((JsonObject) object).getMap();
32+
}
33+
34+
CallbackProvider cbp = AgentTracer.get().getCallbackProvider(RequestContextSlot.APPSEC);
35+
BiFunction<RequestContext, Object, Flow<Void>> callback =
36+
cbp.getCallback(EVENTS.responseBody());
37+
if (callback == null) {
38+
return;
39+
}
40+
41+
Flow<Void> flow = callback.apply(reqCtx, object);
42+
Flow.Action action = flow.getAction();
43+
if (action instanceof Flow.Action.RequestBlockingAction) {
44+
BlockResponseFunction blockResponseFunction = reqCtx.getBlockResponseFunction();
45+
if (blockResponseFunction == null) {
46+
return;
47+
}
48+
Flow.Action.RequestBlockingAction rba = (Flow.Action.RequestBlockingAction) action;
49+
blockResponseFunction.tryCommitBlockingResponse(
50+
reqCtx.getTraceSegment(),
51+
rba.getStatusCode(),
52+
rba.getBlockingContentType(),
53+
rba.getExtraHeaders());
54+
55+
throw new BlockingException("Blocked request (for RoutingContext/json)");
56+
}
57+
}
58+
}

dd-java-agent/instrumentation/vertx-web-4.0/src/test/groovy/server/VertxHttpServerForkedTest.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {
8383
true
8484
}
8585

86+
@Override
87+
boolean testResponseBodyJson() {
88+
true
89+
}
90+
8691
@Override
8792
boolean testBlocking() {
8893
true

dd-java-agent/instrumentation/vertx-web-4.0/src/test/java/server/VertxTestServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ public void start(final Promise<Void> startPromise) {
127127
BODY_JSON,
128128
() -> {
129129
JsonObject json = ctx.getBodyAsJson();
130-
ctx.response().setStatusCode(BODY_JSON.getStatus()).end(json.toString());
130+
ctx.response().setStatusCode(BODY_JSON.getStatus());
131+
ctx.json(json);
131132
}));
132133
router
133134
.route(QUERY_ENCODED_BOTH.getRawPath())

dd-java-agent/instrumentation/vertx-web-5.0/src/test/groovy/server/VertxHttpServerForkedTest.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {
6767
true
6868
}
6969

70+
@Override
71+
boolean testResponseBodyJson() {
72+
true
73+
}
74+
7075
@Override
7176
boolean testBodyUrlencoded() {
7277
true

dd-java-agent/instrumentation/vertx-web-5.0/src/test/java/server/VertxTestServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ public void start(final Promise<Void> startPromise) {
118118
BODY_JSON,
119119
() -> {
120120
JsonObject json = ctx.body().asJsonObject();
121-
ctx.response().setStatusCode(BODY_JSON.getStatus()).end(json.toString());
121+
ctx.response().setStatusCode(BODY_JSON.getStatus());
122+
ctx.json(json);
122123
}));
123124
router
124125
.route(QUERY_ENCODED_BOTH.getRawPath())

dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
135135
ss.registerCallback(events.requestBodyStart(), callbacks.requestBodyStartCb)
136136
ss.registerCallback(events.requestBodyDone(), callbacks.requestBodyEndCb)
137137
ss.registerCallback(events.requestBodyProcessed(), callbacks.requestBodyObjectCb)
138+
ss.registerCallback(events.responseBody(), callbacks.responseBodyObjectCb)
138139
ss.registerCallback(events.responseStarted(), callbacks.responseStartedCb)
139140
ss.registerCallback(events.responseHeader(), callbacks.responseHeaderCb)
140141
ss.registerCallback(events.responseHeaderDone(), callbacks.responseHeaderDoneCb)
@@ -335,6 +336,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
335336
false
336337
}
337338

339+
338340
boolean isRequestBodyNoStreaming() {
339341
// if true, plain text request body tests expect the requestBodyProcessed
340342
// callback to tbe called, not requestBodyStart/requestBodyDone
@@ -353,6 +355,10 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
353355
false
354356
}
355357

358+
boolean testResponseBodyJson() {
359+
false
360+
}
361+
356362
boolean testBlocking() {
357363
false
358364
}
@@ -1581,6 +1587,40 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
15811587
true | 'text/html;q=0.8, application/json;q=0.9'
15821588
}
15831589

1590+
void 'test instrumentation gateway json response body'() {
1591+
setup:
1592+
assumeTrue(testResponseBodyJson())
1593+
def request = request(
1594+
BODY_JSON, 'POST',
1595+
RequestBody.create(MediaType.get('application/json'), '{"a": "x"}'))
1596+
.header(IG_RESPONSE_BODY_TAG, 'true')
1597+
.build()
1598+
def response = client.newCall(request).execute()
1599+
if (isDataStreamsEnabled()) {
1600+
TEST_DATA_STREAMS_WRITER.waitForGroups(1)
1601+
}
1602+
1603+
expect:
1604+
response.body().charStream().text == BODY_JSON.body
1605+
1606+
when:
1607+
TEST_WRITER.waitForTraces(1)
1608+
1609+
then:
1610+
TEST_WRITER.get(0).any {
1611+
it.getTag('response.body') == '[a:[x]]'
1612+
}
1613+
1614+
and:
1615+
if (isDataStreamsEnabled()) {
1616+
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
1617+
verifyAll(first) {
1618+
edgeTags.containsAll(DSM_EDGE_TAGS)
1619+
edgeTags.size() == DSM_EDGE_TAGS.size()
1620+
}
1621+
}
1622+
}
1623+
15841624
@Flaky(value = "https://github.com/DataDog/dd-trace-java/issues/4681", suites = ["GrizzlyAsyncTest", "GrizzlyTest"])
15851625
def 'test blocking of request with json response'() {
15861626
setup:
@@ -2280,6 +2320,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
22802320
static final String IG_BODY_END_BLOCK_HEADER = "x-block-body-end"
22812321
static final String IG_BODY_CONVERTED_HEADER = "x-block-body-converted"
22822322
static final String IG_ASK_FOR_RESPONSE_HEADER_TAGS_HEADER = "x-include-response-headers-in-tags"
2323+
static final String IG_RESPONSE_BODY_TAG = "x-include-response-body-in-tags"
22832324
static final String IG_PEER_ADDRESS = "ig-peer-address"
22842325
static final String IG_PEER_PORT = "ig-peer-port"
22852326
static final String IG_RESPONSE_STATUS = "ig-response-status"
@@ -2303,6 +2344,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
23032344
boolean bodyEndBlock
23042345
boolean bodyConvertedBlock
23052346
boolean responseHeadersInTags
2347+
boolean responseBodyTag
23062348
}
23072349

23082350
static final String stringOrEmpty(String string) {
@@ -2356,6 +2398,9 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
23562398
if (IG_ASK_FOR_RESPONSE_HEADER_TAGS_HEADER.equalsIgnoreCase(key)) {
23572399
context.responseHeadersInTags = true
23582400
}
2401+
if (IG_RESPONSE_BODY_TAG.equalsIgnoreCase(key)) {
2402+
context.responseBodyTag = true
2403+
}
23592404
} as TriConsumer<RequestContext, String, String>
23602405

23612406
final Function<RequestContext, Flow<Void>> requestHeaderDoneCb =
@@ -2450,6 +2495,33 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
24502495
}
24512496
} as BiFunction<RequestContext, Object, Flow<Void>>)
24522497

2498+
final BiFunction<RequestContext, Object, Flow<Void>> responseBodyObjectCb =
2499+
({ RequestContext rqCtxt, Object obj ->
2500+
if (obj instanceof Map) {
2501+
obj = obj.collectEntries {
2502+
[
2503+
it.key,
2504+
(it.value instanceof Iterable || it.value instanceof String[]) ? it.value : [it.value]
2505+
]
2506+
}
2507+
} else if (!(obj instanceof String) && !(obj instanceof List)) {
2508+
obj = obj.properties
2509+
.findAll { it.key != 'class' }
2510+
.collectEntries { [it.key, it.value instanceof Iterable ? it.value : [it.value]] }
2511+
}
2512+
Context context = rqCtxt.getData(RequestContextSlot.APPSEC)
2513+
if (context.responseBodyTag) {
2514+
rqCtxt.traceSegment.setTagTop('response.body', obj as String)
2515+
}
2516+
if (context.responseBlock) {
2517+
new RbaFlow(
2518+
new Flow.Action.RequestBlockingAction(413, BlockingContentType.JSON)
2519+
)
2520+
} else {
2521+
Flow.ResultFlow.empty()
2522+
}
2523+
} as BiFunction<RequestContext, Object, Flow<Void>>)
2524+
24532525
final BiFunction<RequestContext, Integer, Flow<Void>> responseStartedCb =
24542526
({ RequestContext rqCtxt, Integer resultCode ->
24552527
Context context = rqCtxt.getData(RequestContextSlot.APPSEC)

0 commit comments

Comments
 (0)