Skip to content

Commit f9072dd

Browse files
authored
trace/api: fix forwardingTransport with multiple targets (#42515)
### What does this PR do? Before this change we'd close the response body for the request that we RoundTrip which violates the contract of the RoundTripper used by the ReverseProxy. The result was that all the proxied requests looked like they had failed. In practice they hadn't, but it lead to quite a bit of scary logging, and it prevented the client of the proxy from actually getting the response. Also, limit the rate of error logging when sending to secondary endpoints. ### Motivation We recently started using multiple endpoints in some clusters and observed a lot of these logs (this code used to never be exercised). Worse yet, this had caused panics (#42339). ### Describe how you validated your changes Testing. Co-authored-by: andrew.werner <[email protected]>
1 parent 199b574 commit f9072dd

File tree

2 files changed

+114
-55
lines changed

2 files changed

+114
-55
lines changed

pkg/trace/api/transports.go

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ import (
1515
"sync"
1616
"time"
1717

18-
"github.com/DataDog/datadog-agent/pkg/trace/log"
1918
"github.com/DataDog/datadog-go/v5/statsd"
19+
20+
"github.com/DataDog/datadog-agent/pkg/trace/log"
2021
)
2122

2223
// measuringTransport is a transport that emits count and timing metrics
@@ -56,6 +57,7 @@ type forwardingTransport struct {
5657
rt http.RoundTripper
5758
targets []*url.URL
5859
keys []string
60+
logger *log.ThrottledLogger
5961
}
6062

6163
// newForwardingTransport creates a new forwardingTransport, wrapping another
@@ -80,12 +82,12 @@ func newForwardingTransport(
8082
apiKeys = append(apiKeys, strings.TrimSpace(key))
8183
}
8284
}
83-
return &forwardingTransport{rt, targets, apiKeys}
85+
return &forwardingTransport{rt, targets, apiKeys, log.NewThrottled(10, 10*time.Second)}
8486
}
8587

8688
// RoundTrip makes an HTTP round trip forwarding one request to multiple
8789
// additional endpoints.
88-
func (m *forwardingTransport) RoundTrip(req *http.Request) (rres *http.Response, rerr error) {
90+
func (m *forwardingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
8991
setTarget := func(r *http.Request, u *url.URL, apiKey string) {
9092
q := r.URL.Query()
9193
u.RawQuery = q.Encode()
@@ -98,46 +100,50 @@ func (m *forwardingTransport) RoundTrip(req *http.Request) (rres *http.Response,
98100
return m.rt.RoundTrip(req)
99101
}
100102

101-
slurp, err := io.ReadAll(req.Body)
102-
if err != nil {
103-
return nil, err
103+
var body []byte
104+
if req.Body != nil {
105+
slurp, err := io.ReadAll(req.Body)
106+
if err != nil {
107+
return nil, err
108+
}
109+
body = slurp
104110
}
105111

112+
roundTripAdditional := func(req *http.Request) {
113+
resp, err := m.rt.RoundTrip(req)
114+
if err == nil {
115+
// we discard responses for all subsequent requests
116+
io.Copy(io.Discard, resp.Body) //nolint:errcheck
117+
} else {
118+
m.logger.Error("error forwarding request to %s: %v", req.URL, err)
119+
}
120+
if resp != nil && resp.Body != nil {
121+
resp.Body.Close()
122+
}
123+
}
106124
var wg sync.WaitGroup
107-
wg.Add(len(m.targets))
108125
for i, u := range m.targets {
109-
go func(i int, u *url.URL) {
126+
if i == 0 {
127+
continue
128+
}
129+
newreq := req.Clone(req.Context())
130+
if body != nil {
131+
newreq.Body = io.NopCloser(bytes.NewReader(body))
132+
}
133+
setTarget(newreq, u, m.keys[i])
134+
wg.Add(1)
135+
go func() {
110136
defer wg.Done()
111-
newreq := req.Clone(req.Context())
112-
newreq.Body = io.NopCloser(bytes.NewReader(slurp))
113-
setTarget(newreq, u, m.keys[i])
114-
if i == 0 {
115-
// Given the way we construct the list of targets the main endpoint
116-
// will be the first one called, we return its response and error.
117-
// Ignoring bodyclose lint here because of a bug in the linter:
118-
// https://github.com/timakin/bodyclose/issues/30.
119-
rres, rerr = m.rt.RoundTrip(newreq) //nolint:bodyclose
120-
if rres != nil && rres.Body != nil {
121-
rres.Body.Close()
122-
}
123-
return
124-
}
125-
resp, err := m.rt.RoundTrip(newreq)
126-
if err == nil {
127-
// we discard responses for all subsequent requests
128-
io.Copy(io.Discard, resp.Body) //nolint:errcheck
129-
} else {
130-
log.Error(err)
131-
}
132-
if resp != nil && resp.Body != nil {
133-
resp.Body.Close()
134-
}
135-
136-
}(i, u)
137+
roundTripAdditional(newreq)
138+
}()
139+
}
140+
setTarget(req, m.targets[0], m.keys[0])
141+
if body != nil {
142+
req.Body = io.NopCloser(bytes.NewReader(body))
137143
}
144+
res, err := m.rt.RoundTrip(req)
138145
wg.Wait()
139-
140-
return rres, rerr
146+
return res, err
141147
}
142148

143149
// newMeasuringForwardingTransport creates a forwardingTransport wrapped in a measuringTransport.

pkg/trace/api/transports_test.go

Lines changed: 72 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,25 @@ import (
99
"errors"
1010
"io"
1111
"net/http"
12+
"net/http/httptest"
1213
"net/url"
1314
"strings"
1415
"testing"
1516
)
1617

17-
// mockTransport is a simple mock that returns a nil response and an error
18-
type mockTransport struct {
19-
returnNilResponse bool
20-
}
18+
// nilResponseTransport is a simple mock that returns a nil response and an
19+
// error.
20+
type nilResponseTransport struct{}
2121

22-
func (m *mockTransport) RoundTrip(_ *http.Request) (*http.Response, error) {
23-
if m.returnNilResponse {
24-
return nil, errors.New("mock error with nil response")
25-
}
26-
return &http.Response{
27-
StatusCode: 200,
28-
Body: io.NopCloser(strings.NewReader("ok")),
29-
}, nil
22+
func (m *nilResponseTransport) RoundTrip(_ *http.Request) (*http.Response, error) {
23+
return nil, errors.New("mock error with nil response")
3024
}
3125

3226
// TestForwardingTransport_NilResponse verifies that RoundTrip doesn't panic
3327
// when the underlying transport returns a nil response with an error.
3428
func TestForwardingTransport_NilResponse(t *testing.T) {
35-
mockRT := &mockTransport{returnNilResponse: true}
3629
mainEndpoint, _ := url.Parse("http://localhost:8080")
37-
38-
ft := newForwardingTransport(mockRT, mainEndpoint, "test-key", nil)
39-
30+
ft := newForwardingTransport(&nilResponseTransport{}, mainEndpoint, "test-key", nil)
4031
req, err := http.NewRequest("POST", "http://localhost:8080/v1/traces", strings.NewReader("test body"))
4132
if err != nil {
4233
t.Fatalf("failed to create request: %v", err)
@@ -60,14 +51,12 @@ func TestForwardingTransport_NilResponse(t *testing.T) {
6051
// TestForwardingTransport_MultipleTargetsNilResponse verifies that RoundTrip doesn't panic
6152
// when forwarding to multiple targets and the underlying transport returns a nil response.
6253
func TestForwardingTransport_MultipleTargetsNilResponse(t *testing.T) {
63-
mockRT := &mockTransport{returnNilResponse: true}
6454
mainEndpoint, _ := url.Parse("http://localhost:8080")
6555
additionalEndpoints := map[string][]string{
6656
"http://localhost:8081": {"key1"},
6757
}
6858

69-
ft := newForwardingTransport(mockRT, mainEndpoint, "test-key", additionalEndpoints)
70-
59+
ft := newForwardingTransport(&nilResponseTransport{}, mainEndpoint, "test-key", additionalEndpoints)
7160
req, err := http.NewRequest("POST", "http://localhost:8080/v1/traces", strings.NewReader("test body"))
7261
if err != nil {
7362
t.Fatalf("failed to create request: %v", err)
@@ -87,3 +76,67 @@ func TestForwardingTransport_MultipleTargetsNilResponse(t *testing.T) {
8776
t.Error("expected nil response")
8877
}
8978
}
79+
80+
// TestForwardingTransport_MultipleTargets verifies that the transport forwards
81+
// the request to multiple targets correctly. It explicitly ensures that the
82+
// response body is not closed when forwarding to multiple targets. Furthermore,
83+
// it tests explicitly that the transport works when the request body is nil.
84+
func TestForwardingTransport_MultipleTargets(t *testing.T) {
85+
const responseBody = "ok"
86+
setupTransport := func(t *testing.T) http.RoundTripper {
87+
h := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
88+
w.Write([]byte(responseBody))
89+
})
90+
s := httptest.NewServer(h)
91+
t.Cleanup(s.Close)
92+
s2 := httptest.NewServer(h)
93+
t.Cleanup(s2.Close)
94+
mainEndpoint, _ := url.Parse(s.URL)
95+
additionalEndpoint, _ := url.Parse(s2.URL)
96+
additionalEndpoints := map[string][]string{
97+
additionalEndpoint.String(): {"key1"},
98+
}
99+
return newForwardingTransport(http.DefaultTransport, mainEndpoint, "test-key", additionalEndpoints)
100+
}
101+
validateResponse := func(t *testing.T, rt http.RoundTripper, req *http.Request) {
102+
resp, err := rt.RoundTrip(req)
103+
if err != nil {
104+
t.Errorf("failed to round trip: %v", err)
105+
}
106+
if resp != nil && resp.Body != nil {
107+
defer resp.Body.Close()
108+
}
109+
read, err := io.ReadAll(resp.Body)
110+
if err != nil {
111+
t.Fatalf("failed to read response body: %v", err)
112+
}
113+
if read := string(read); read != responseBody {
114+
t.Fatalf("expected response body to be %q, got %q", responseBody, read)
115+
}
116+
}
117+
newRequest := func(t *testing.T, method, url string, body io.Reader) *http.Request {
118+
req, err := http.NewRequest(method, url, body)
119+
if err != nil {
120+
t.Fatalf("failed to create request: %v", err)
121+
}
122+
return req
123+
}
124+
for _, c := range []struct {
125+
name string
126+
req *http.Request
127+
}{
128+
{
129+
name: "nil request body",
130+
req: newRequest(t, "POST", "http://localhost:8080/v1/traces", nil /* body */),
131+
},
132+
{
133+
name: "non-nil request body",
134+
req: newRequest(t, "POST", "http://localhost:8080/v1/traces", strings.NewReader("request body")),
135+
},
136+
} {
137+
t.Run(c.name, func(t *testing.T) {
138+
rt := setupTransport(t)
139+
validateResponse(t, rt, c.req)
140+
})
141+
}
142+
}

0 commit comments

Comments
 (0)