diff --git a/internal/component/pyroscope/write/trace.go b/internal/component/pyroscope/write/trace.go new file mode 100644 index 0000000000..69f8325433 --- /dev/null +++ b/internal/component/pyroscope/write/trace.go @@ -0,0 +1,127 @@ +package write + +import ( + "crypto/tls" + "net/http/httptrace" + "strings" + "sync" + "time" + + "github.com/go-kit/log" +) + +type clientTrace struct { + trace *httptrace.ClientTrace + es [][]any + mu sync.Mutex +} + +func newClientTrace() *clientTrace { + t := &clientTrace{} + t.trace = &httptrace.ClientTrace{ + GetConn: func(hostPort string) { + t.log( + "msg", "GetConn", + "hostPort", hostPort, + ) + }, + GotConn: func(info httptrace.GotConnInfo) { + var remoteAddr, localAddr string + if info.Conn != nil { + remoteAddr = info.Conn.RemoteAddr().String() + localAddr = info.Conn.LocalAddr().String() + } + t.log( + "msg", "GotConn", + "Reused", info.Reused, + "WasIdle", info.WasIdle, + "IdleTime", info.IdleTime, + "RemoteAddr", remoteAddr, + "LocalAddr", localAddr, + ) + }, + PutIdleConn: func(err error) { + t.log( + "msg", "PutIdleConn", + "err", err, + ) + }, + GotFirstResponseByte: func() { + t.log("msg", "GotFirstResponseByte") + }, + Got100Continue: nil, + Got1xxResponse: nil, + DNSStart: func(info httptrace.DNSStartInfo) { + t.log( + "msg", "DNSStart", + "Host", info.Host, + ) + }, + DNSDone: func(info httptrace.DNSDoneInfo) { + var addrs []string + for _, addr := range info.Addrs { + addrs = append(addrs, addr.String()) + } + t.log( + "msg", "DNSDone", + "Addrs", strings.Join(addrs, ","), + "Coalesced", info.Coalesced, + "Err", info.Err, + ) + }, + ConnectStart: func(network, addr string) { + t.log( + "msg", "ConnectStart", + "addr", addr, + "network", network) + }, + ConnectDone: func(network, addr string, err error) { + t.log( + "msg", "ConnectDone", + "addr", addr, + "network", network, + "err", err, + ) + }, + TLSHandshakeStart: func() { + t.log("msg", "TLSHandshakeStart") + }, + TLSHandshakeDone: func(state tls.ConnectionState, err error) { + t.log( + "msg", "TLSHandshakeDone", + "Version", state.Version, + "CipherSuite", state.CipherSuite, + "ServerName", state.ServerName, + "NegotiatedProtocol", state.NegotiatedProtocol, + "err", err, + ) + }, + WroteHeaderField: nil, + WroteHeaders: func() { + t.log("msg", "WroteHeaders") + }, + Wait100Continue: nil, + WroteRequest: func(info httptrace.WroteRequestInfo) { + t.log( + "msg", "WroteRequest", + "Err", info.Err, + ) + }, + } + return t +} + +func (t *clientTrace) log(kvs ...any) { + t.mu.Lock() + defer t.mu.Unlock() + l := append([]any{"tt", time.Now()}, kvs...) + t.es = append(t.es, l) +} + +func (t *clientTrace) flush(logger log.Logger) { + t.mu.Lock() + defer t.mu.Unlock() + for _, e := range t.es { + _ = logger.Log(e...) + } +} diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 5aeb4c87c8..3313883c76 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "net/http/httptrace" "net/url" "path" "sort" @@ -45,6 +46,8 @@ var ( Tracing: TracingOptions{ JaegerPropagator: true, TraceContextPropagator: true, + HttpClientTraceErrors: false, + HttpClientTraceAll: false, }, } } @@ -89,6 +92,8 @@ type Arguments struct { type TracingOptions struct { JaegerPropagator bool `alloy:"jaeger_propagator,attr,optional"` TraceContextPropagator bool `alloy:"trace_context_propagator,attr,optional"` + HttpClientTraceErrors bool `alloy:"http_client_trace_errors,attr,optional"` + HttpClientTraceAll bool `alloy:"http_client_trace_all,attr,optional"` } // SetToDefault implements syntax.Defaulter. @@ -141,7 +146,7 @@ type Component struct { logger log.Logger tracer trace.Tracer onStateChange func(Exports) - cfg Arguments + arguments Arguments metrics *metrics userAgent string uid string @@ -171,7 +176,7 @@ func New( onStateChange(Exports{Receiver: receiver}) return &Component{ - cfg: c, + arguments: c, logger: logger, tracer: tracer, onStateChange: onStateChange, @@ -191,7 +196,7 @@ func (c *Component) Run(ctx context.Context) error { // Update implements Component. func (c *Component) Update(newConfig component.Arguments) error { - c.cfg = newConfig.(Arguments) + c.arguments = newConfig.(Arguments) receiver, err := newFanOut(c.logger, c.tracer, newConfig.(Arguments), c.metrics, c.userAgent, c.uid) if err != nil { return err @@ -201,68 +206,83 @@ func (c *Component) Update(newConfig component.Arguments) error { } type fanOutClient struct { - // The list of push clients to fan out to. - pushClients []pushv1connect.PusherServiceClient - ingestClients map[*EndpointOptions]*http.Client - config Arguments - metrics *metrics - tracer trace.Tracer - logger log.Logger + endpoints []endpoint + arguments Arguments + metrics *metrics + tracer trace.Tracer + logger log.Logger +} + +type endpoint struct { + pushClient pushv1connect.PusherServiceClient + ingestClient *http.Client + options *EndpointOptions + url *url.URL } // newFanOut creates a new fan out client that will fan out to all endpoints. func newFanOut(logger log.Logger, tracer trace.Tracer, config Arguments, metrics *metrics, userAgent string, uid string) (*fanOutClient, error) { - pushClients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints)) - ingestClients := make(map[*EndpointOptions]*http.Client) + clients := make([]endpoint, 0, len(config.Endpoints)) - for _, endpoint := range config.Endpoints { - if endpoint.Headers == nil { - endpoint.Headers = map[string]string{} + for _, e := range config.Endpoints { + u, err := url.Parse(e.URL) + if err != nil { + return nil, fmt.Errorf("parse URL: %w", err) } - endpoint.Headers["X-Alloy-Id"] = uid - httpClient, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) + if e.Headers == nil { + e.Headers = map[string]string{} + } + e.Headers["X-Alloy-Id"] = uid + e.Headers["User-Agent"] = userAgent + httpClient, err := commonconfig.NewClientFromConfig(*e.HTTPClientConfig.Convert(), e.Name) if err != nil { return nil, err } configureTracing(config, httpClient) - pushClients = append( - pushClients, - pushv1connect.NewPusherServiceClient(httpClient, endpoint.URL, WithUserAgent(userAgent)), + push := pushv1connect.NewPusherServiceClient(httpClient, e.URL) + clients = append( + clients, + endpoint{ + pushClient: push, + ingestClient: httpClient, + options: e, + url: u, + }, ) - ingestClients[endpoint] = httpClient } return &fanOutClient{ - logger: logger, - tracer: tracer, - pushClients: pushClients, - ingestClients: ingestClients, - config: config, - metrics: metrics, + endpoints: clients, + logger: logger, + tracer: tracer, + arguments: config, + metrics: metrics, }, nil } -// Push implements the PusherServiceClient interface. -func (f *fanOutClient) Push( - ctx context.Context, - req *connect.Request[pushv1.PushRequest], -) (*connect.Response[pushv1.PushResponse], error) { +type forwardRequest struct { + op string + reqSize, profileCount int64 + impl func(ctx context.Context, l log.Logger, e endpoint) error +} - defer f.observeLatency("-", "push_total")() +// forward forwards and multiplex push and ingest requests down to endpoints with retries +func (f *fanOutClient) forward(ctx context.Context, req forwardRequest) error { + defer f.observeLatency("-", req.op+"_total")() - ctx, sp := f.tracer.Start(ctx, "Push") + ctx, sp := f.tracer.Start(ctx, req.op) defer sp.End() var ( - wg sync.WaitGroup - errs error - errorMut sync.Mutex - dl any - ok bool - reqSize, profileCount = requestSize(req) - l = util.TraceLog(f.logger, sp) - st = time.Now() + wg sync.WaitGroup + errs error + errorMut sync.Mutex + dl any + ok bool + l = util.TraceLog(f.logger, sp) + st = time.Now() ) + l = log.With(l, "op", req.op) if dl, ok = ctx.Deadline(); !ok { dl = "none" } @@ -273,74 +293,79 @@ func (f *fanOutClient) Push( l = level.Debug(l) } _ = l.Log( - "msg", "Push", - "sz", reqSize, - "n", profileCount, + "sz", req.reqSize, + "n", req.profileCount, "dl", dl, "st", st, ) }() - for i, client := range f.pushClients { - var ( - client = client - i = i - backoff = backoff.New(ctx, backoff.Config{ - MinBackoff: f.config.Endpoints[i].MinBackoff, - MaxBackoff: f.config.Endpoints[i].MaxBackoff, - MaxRetries: f.config.Endpoints[i].MaxBackoffRetries, - }) - err error - ) + for _, e := range f.endpoints { wg.Add(1) go func() { - defer f.observeLatency(f.config.Endpoints[i].URL, "push_endpoint")() + var ( + b = backoff.New(ctx, backoff.Config{ + MinBackoff: e.options.MinBackoff, + MaxBackoff: e.options.MaxBackoff, + MaxRetries: e.options.MaxBackoffRetries, + }) + err error + ) + defer f.observeLatency(e.options.URL, req.op+"_endpoint")() defer wg.Done() - req := connect.NewRequest(req.Msg) - for k, v := range f.config.Endpoints[i].Headers { - req.Header().Set(k, v) - } + for { - err = func() error { - defer f.observeLatency(f.config.Endpoints[i].URL, "push_downstream")() - ctx, cancel := context.WithTimeout(ctx, f.config.Endpoints[i].RemoteTimeout) - defer cancel() - - _, err := client.Push(ctx, req) - return err - }() + err = f.forwardDownstream(ctx, l, e, req) if err == nil { - f.metrics.sentBytes.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(reqSize)) - f.metrics.sentProfiles.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(profileCount)) + f.metrics.sentBytes.WithLabelValues(e.options.URL).Add(float64(req.reqSize)) + f.metrics.sentProfiles.WithLabelValues(e.options.URL).Add(float64(req.profileCount)) break } - _ = level.Debug(l).Log("msg", - "failed to push to endpoint", - "endpoint", f.config.Endpoints[i].URL, - "retries", backoff.NumRetries(), + _ = level.Debug(l).Log( + "msg", "failed to forward to endpoint", + "endpoint", e.options.URL, + "retries", b.NumRetries(), "err", err, ) if !shouldRetry(err) { break } - backoff.Wait() - if !backoff.Ongoing() { + b.Wait() + if !b.Ongoing() { break } - f.metrics.retries.WithLabelValues(f.config.Endpoints[i].URL).Inc() + f.metrics.retries.WithLabelValues(e.options.URL).Inc() } if err != nil { - f.metrics.droppedBytes.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(reqSize)) - f.metrics.droppedProfiles.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(profileCount)) - err = fmt.Errorf("failed to push to endpoint %s (%d retries): %w", f.config.Endpoints[i].URL, backoff.NumRetries(), err) + f.metrics.droppedBytes.WithLabelValues(e.options.URL).Add(float64(req.reqSize)) + f.metrics.droppedProfiles.WithLabelValues(e.options.URL).Add(float64(req.profileCount)) + err = fmt.Errorf("failed to forward to endpoint %s (%d retries): %w", e.options.URL, b.NumRetries(), err) util.ErrorsJoinConcurrent(&errs, err, &errorMut) } }() } wg.Wait() - if errs != nil { - return nil, errs + return errs +} + +// Push implements the PusherServiceClient interface. +func (f *fanOutClient) Push( + ctx context.Context, + req *connect.Request[pushv1.PushRequest], +) (*connect.Response[pushv1.PushResponse], error) { + + reqSize, profileCount := requestSize(req) + err := f.forward(ctx, forwardRequest{ + op: "push", + reqSize: reqSize, + profileCount: profileCount, + impl: func(ctx context.Context, l log.Logger, e endpoint) error { + return f.pushDownstream(ctx, e, req.Msg) + }, + }) + if err != nil { + return nil, err } return connect.NewResponse(&pushv1.PushResponse{}), nil } @@ -393,7 +418,7 @@ func (f *fanOutClient) Append(ctx context.Context, lbs labels.Labels, samples [] // todo(ctovena): we should probably pool the label pair arrays and label builder to avoid allocs. var ( - protoLabels = make([]*typesv1.LabelPair, 0, len(lbs)+len(f.config.ExternalLabels)) + protoLabels = make([]*typesv1.LabelPair, 0, len(lbs)+len(f.arguments.ExternalLabels)) protoSamples = make([]*pushv1.RawSample, 0, len(samples)) lbsBuilder = labels.NewBuilder(nil) ) @@ -408,7 +433,7 @@ func (f *fanOutClient) Append(ctx context.Context, lbs labels.Labels, samples [] } lbsBuilder.Set(label.Name, label.Value) } - for name, value := range f.config.ExternalLabels { + for name, value := range f.arguments.ExternalLabels { lbsBuilder.Set(name, value) } for _, l := range lbsBuilder.Labels() { @@ -454,39 +479,6 @@ func (e *PyroscopeWriteError) readBody(resp *http.Response) { // AppendIngest implements the pyroscope.Appender interface. func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error { - defer f.observeLatency("-", "ingest_total")() - - ctx, sp := f.tracer.Start(ctx, "AppendIngest") - defer sp.End() - - var ( - wg sync.WaitGroup - errs error - errorMut sync.Mutex - dl any - ok bool - reqSize, profileCount = int64(len(profile.RawBody)), int64(1) - l = util.TraceLog(f.logger, sp) - st = time.Now() - ) - if dl, ok = ctx.Deadline(); !ok { - dl = "none" - } - defer func() { - if errs != nil { - l = level.Warn(log.With(l, "err", errs)) - } else { - l = level.Debug(l) - } - _ = l.Log( - "msg", "AppendIngest", - "sz", reqSize, - "n", profileCount, - "dl", dl, - "st", st, - ) - }() - // Handle labels query := profile.URL.Query() ls := labelset.New(make(map[string]string)) @@ -502,113 +494,19 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco }) // Add external labels (which will override any existing ones) - for k, v := range f.config.ExternalLabels { + for k, v := range f.arguments.ExternalLabels { ls.Add(k, v) } query.Set("name", ls.Normalized()) - // Send to each endpoint concurrently - for endpointIdx, endpoint := range f.config.Endpoints { - var ( - endpoint = endpoint - i = endpointIdx - backoff = backoff.New(ctx, backoff.Config{ - MinBackoff: f.config.Endpoints[i].MinBackoff, - MaxBackoff: f.config.Endpoints[i].MaxBackoff, - MaxRetries: f.config.Endpoints[i].MaxBackoffRetries, - }) - err error - ) - wg.Add(1) - go func() { - defer f.observeLatency(endpoint.URL, "ingest_endpoint")() - defer wg.Done() - for { - err = func() error { - defer f.observeLatency(endpoint.URL, "ingest_downstream")() - u, err := url.Parse(endpoint.URL) - if err != nil { - return fmt.Errorf("parse URL: %w", err) - } - - u.Path = path.Join(u.Path, profile.URL.Path) - - // attach labels - u.RawQuery = query.Encode() - - ctx, cancel := context.WithTimeout(ctx, f.config.Endpoints[i].RemoteTimeout) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(profile.RawBody)) - if err != nil { - return fmt.Errorf("create request: %w", err) - } - - // set headers from endpoint - for k, v := range endpoint.Headers { - req.Header.Set(k, v) - } - - // now set profile content type, overwrite what existed - for idx := range profile.ContentType { - if idx == 0 { - req.Header.Set(pyroscope.HeaderContentType, profile.ContentType[idx]) - continue - } - req.Header.Add(pyroscope.HeaderContentType, profile.ContentType[idx]) - } - - resp, err := f.ingestClients[endpoint].Do(req) - if err != nil { - return fmt.Errorf("do request: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - wErr := &PyroscopeWriteError{StatusCode: resp.StatusCode} - wErr.readBody(resp) - return fmt.Errorf("remote error: %w", wErr) - } - - // Ensure full body is read to keep http connection Keep-Alive - _, err = io.Copy(io.Discard, resp.Body) - if err != nil { - return fmt.Errorf("reading response body: %w", err) - } - - return nil - }() - if err == nil { - f.metrics.sentBytes.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(reqSize)) - f.metrics.sentProfiles.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(profileCount)) - break - } - _ = level.Debug(l).Log( - "msg", "failed to ingest to endpoint", - "endpoint", f.config.Endpoints[i].URL, - "retries", backoff.NumRetries(), - "err", err) - if !shouldRetry(err) { - break - } - backoff.Wait() - if !backoff.Ongoing() { - break - } - f.metrics.retries.WithLabelValues(f.config.Endpoints[i].URL).Inc() - } - if err != nil { - f.metrics.droppedBytes.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(reqSize)) - f.metrics.droppedProfiles.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(profileCount)) - err = fmt.Errorf("failed to ingest to endpoint %s (%d retries): %w", f.config.Endpoints[i].URL, backoff.NumRetries(), err) - util.ErrorsJoinConcurrent(&errs, err, &errorMut) - } - }() - } - - wg.Wait() - - return errs + return f.forward(ctx, forwardRequest{ + op: "ingest", + reqSize: int64(len(profile.RawBody)), + profileCount: 1, + impl: func(ctx context.Context, l log.Logger, e endpoint) error { + return f.ingestDownstream(ctx, e, profile, query) + }, + }) } func (f *fanOutClient) observeLatency(endpoint, latencyType string) func() { @@ -618,34 +516,6 @@ func (f *fanOutClient) observeLatency(endpoint, latencyType string) func() { } } -// WithUserAgent returns a `connect.ClientOption` that sets the User-Agent header on. -func WithUserAgent(agent string) connect.ClientOption { - return connect.WithInterceptors(&agentInterceptor{agent}) -} - -type agentInterceptor struct { - agent string -} - -func (i *agentInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { - return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { - req.Header().Set("User-Agent", i.agent) - return next(ctx, req) - } -} - -func (i *agentInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { - return func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn { - conn := next(ctx, spec) - conn.RequestHeader().Set("User-Agent", i.agent) - return conn - } -} - -func (i *agentInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { - return next -} - func ensureNameMatchesService(lbls labels.Labels) labels.Labels { if serviceName := lbls.Get(pyroscope.LabelServiceName); serviceName != "" { builder := labels.NewBuilder(lbls) @@ -704,3 +574,70 @@ func configureTracing(config Arguments, httpClient *http.Client) { ) } } + +func (f *fanOutClient) forwardDownstream(ctx context.Context, l log.Logger, e endpoint, req forwardRequest) error { + defer f.observeLatency(e.options.URL, req.op+"_downstream")() + downstreamContext, cancel := context.WithTimeout(ctx, e.options.RemoteTimeout) + defer cancel() + + if !f.arguments.Tracing.HttpClientTraceAll && !f.arguments.Tracing.HttpClientTraceErrors { + return req.impl(downstreamContext, l, e) + } + ct := newClientTrace() + ctx = httptrace.WithClientTrace(ctx, ct.trace) + err := req.impl(ctx, l, e) + if f.arguments.Tracing.HttpClientTraceAll || (f.arguments.Tracing.HttpClientTraceErrors && err != nil) { + ct.flush(level.Debug(l)) + } + return err +} + +func (f *fanOutClient) pushDownstream(ctx context.Context, e endpoint, msg *pushv1.PushRequest) error { + req := connect.NewRequest(msg) + for k, v := range e.options.Headers { + req.Header().Set(k, v) + } + _, err := e.pushClient.Push(ctx, req) + return err +} + +func (f *fanOutClient) ingestDownstream(ctx context.Context, e endpoint, profile *pyroscope.IncomingProfile, query url.Values) error { + u := *e.url + u.Path = path.Join(u.Path, profile.URL.Path) + u.RawQuery = query.Encode() + + req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(profile.RawBody)) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + for k, v := range e.options.Headers { + req.Header.Set(k, v) + } + + for idx := range profile.ContentType { + if idx == 0 { + req.Header.Set(pyroscope.HeaderContentType, profile.ContentType[idx]) + continue + } + req.Header.Add(pyroscope.HeaderContentType, profile.ContentType[idx]) + } + + resp, err := e.ingestClient.Do(req) + if err != nil { + return fmt.Errorf("do request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + wErr := &PyroscopeWriteError{StatusCode: resp.StatusCode} + wErr.readBody(resp) + return fmt.Errorf("remote error: %w", wErr) + } + + _, err = io.Copy(io.Discard, resp.Body) + if err != nil { + return fmt.Errorf("reading response body: %w", err) + } + return nil +} diff --git a/internal/component/pyroscope/write/write_test.go b/internal/component/pyroscope/write/write_test.go index 6e72df6af5..0e2893718e 100644 --- a/internal/component/pyroscope/write/write_test.go +++ b/internal/component/pyroscope/write/write_test.go @@ -1,18 +1,23 @@ package write import ( + "bytes" "context" "errors" + "fmt" "io" "net/http" "net/http/httptest" "net/url" "strconv" + "strings" "sync" "testing" "time" "connectrpc.com/connect" + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component/common/config" "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" @@ -574,6 +579,172 @@ func Test_Write_AppendIngest(t *testing.T) { suite.Run(t, new(AppendIngestTestSuite)) } +func Test_Write_HttpClientTrace(t *testing.T) { + testCases := []struct { + name string + traceAll bool + traceErrors bool + simulateError bool + expectTraceLogs bool + expectedLogMessages []string + }{ + { + name: "all_requests_tracing_enabled_success", + traceAll: true, + traceErrors: false, + simulateError: false, + expectTraceLogs: true, + expectedLogMessages: []string{"GetConn", "GotConn", "WroteHeaders", "GotFirstResponseByte"}, + }, + { + name: "all_requests_tracing_enabled_error", + traceAll: true, + traceErrors: false, + simulateError: true, + expectTraceLogs: true, + expectedLogMessages: []string{"GetConn", "GotConn", "WroteHeaders"}, + }, + { + name: "error_only_tracing_success", + traceAll: false, + traceErrors: true, + simulateError: false, + expectTraceLogs: false, + expectedLogMessages: []string{}, + }, + { + name: "error_only_tracing_error", + traceAll: false, + traceErrors: true, + simulateError: true, + expectTraceLogs: true, + expectedLogMessages: []string{"GetConn", "GotConn", "WroteHeaders"}, + }, + { + name: "tracing_disabled_success", + traceAll: false, + traceErrors: false, + simulateError: false, + expectTraceLogs: false, + expectedLogMessages: []string{}, + }, + { + name: "tracing_disabled_error", + traceAll: false, + traceErrors: false, + simulateError: true, + expectTraceLogs: false, + expectedLogMessages: []string{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + logBuf := bytes.NewBuffer(nil) + l := log.NewSyncLogger(log.NewLogfmtLogger(logBuf)) + + handler := http.NewServeMux() + pushpath, pushHandler := pushv1connect.NewPusherServiceHandler(PushFunc( + func(_ context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) { + if tc.simulateError { + return nil, connect.NewError(connect.CodeInternal, errors.New("simulated error")) + } + return &connect.Response[pushv1.PushResponse]{}, nil + }, + )) + handler.Handle(pushpath, pushHandler) + handler.HandleFunc("/ingest", func(w http.ResponseWriter, r *http.Request) { + if tc.simulateError { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + }) + + server := httptest.NewServer(handler) + defer server.Close() + + argument := DefaultArguments() + argument.Tracing.HttpClientTraceAll = tc.traceAll + argument.Tracing.HttpClientTraceErrors = tc.traceErrors + argument.Endpoints = []*EndpointOptions{{ + URL: server.URL, + RemoteTimeout: GetDefaultEndpointOptions().RemoteTimeout, + MaxBackoffRetries: -1, + }} + + var export Exports + var wg sync.WaitGroup + wg.Add(1) + c, _ := New( + l, + noop.Tracer{}, + prometheus.NewRegistry(), + func(e Exports) { + defer wg.Done() + export = e + }, + "", + "", + argument, + ) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + go c.Run(ctx) + wg.Wait() + require.NotNil(t, export.Receiver) + for _, push := range []bool{true, false} { + name := "push" + if !push { + name = "ingest" + } + t.Run(name, func(t *testing.T) { + var err error + if push { + err = export.Receiver.Appender().Append(ctx, labels.FromMap(map[string]string{ + "__name__": "test", + "job": "foo", + }), []*pyroscope.RawSample{ + {ID: "test-request-id", RawProfile: []byte("pprofraw")}, + }) + } else { + err = export.Receiver.Appender().AppendIngest(ctx, &pyroscope.IncomingProfile{ + RawBody: []byte("pprofraw"), + Labels: labels.FromMap(map[string]string{"__name__": "test", "job": "foo"}), + URL: &url.URL{Path: "/ingest"}, + }) + } + + if tc.simulateError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + if tc.expectTraceLogs { + foundTraceLog := false + for _, expectedMsg := range tc.expectedLogMessages { + if strings.Contains(logBuf.String(), fmt.Sprintf("msg=%s", expectedMsg)) { + foundTraceLog = true + break + } + } + require.True(t, foundTraceLog, + "Expected to find at least one trace log message from %v in logs: %s", + tc.expectedLogMessages, logBuf.String()) + } else { + for _, expectedMsg := range []string{"GetConn", "GotConn", "WroteHeaders", "GotFirstResponseByte"} { + require.NotContains(t, logBuf.String(), fmt.Sprintf("msg=%s", expectedMsg), + "Expected no trace logs but found %s in: %s", expectedMsg, logBuf.String()) + } + } + }) + } + }) + } +} + func Test_Write_FanOut_ValidateLabels(t *testing.T) { _, handler := pushv1connect.NewPusherServiceHandler(PushFunc( func(_ context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) { @@ -683,3 +854,93 @@ func Test_Write_FanOut_ValidateLabels(t *testing.T) { }) } } + +func Test_ClientTrace_ConcurrentAccess_TLS_DataRace(t *testing.T) { + logBuf := bytes.NewBuffer(nil) + l := log.NewSyncLogger(log.NewLogfmtLogger(logBuf)) + + handler := http.NewServeMux() + handler.HandleFunc("/ingest", func(w http.ResponseWriter, r *http.Request) { + responseSize := (r.ContentLength % 5) + 1 + responseBody := strings.Repeat("x", int(responseSize*1000)) + + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(responseBody)) + }) + + server := httptest.NewTLSServer(handler) + defer server.Close() + + argument := DefaultArguments() + argument.Tracing.HttpClientTraceAll = true + argument.Endpoints = []*EndpointOptions{{ + URL: server.URL, + RemoteTimeout: GetDefaultEndpointOptions().RemoteTimeout, + HTTPClientConfig: func() *config.HTTPClientConfig { + cfg := config.CloneDefaultHTTPClientConfig() + cfg.TLSConfig.InsecureSkipVerify = true // Trust the self-signed certificate + return cfg + }(), + }} + + var wg sync.WaitGroup + var export Exports + wg.Add(1) + c, err := New( + l, + noop.Tracer{}, + prometheus.NewRegistry(), + func(e Exports) { + defer wg.Done() + export = e + }, + "Alloy/239", + "", + argument, + ) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + go c.Run(ctx) + wg.Wait() + require.NotNil(t, export.Receiver) + + const numGoroutines = 100 + const requestsPerGoroutine = 10 + + var testWg sync.WaitGroup + testWg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(goroutineID int) { + defer testWg.Done() + for j := 0; j < requestsPerGoroutine; j++ { + profileSize := (goroutineID*j)%10 + 1 + profileData := bytes.Repeat([]byte("profile-data"), profileSize*100) + profile := &pyroscope.IncomingProfile{ + RawBody: profileData, + Labels: labels.FromMap(map[string]string{ + "__name__": "test-concurrent", + "goroutine_id": fmt.Sprintf("%d", goroutineID), + "request_id": fmt.Sprintf("%d", j), + }), + URL: &url.URL{Path: "/ingest"}, + ContentType: []string{"application/octet-stream"}, + } + + err := export.Receiver.Appender().AppendIngest(ctx, profile) + require.NoError(t, err) + } + }(i) + } + + testWg.Wait() + + logOutput := logBuf.String() + require.Contains(t, logOutput, "msg=GetConn") + require.Contains(t, logOutput, "msg=GotConn") + require.Contains(t, logOutput, "msg=TLSHandshakeStart") + require.Contains(t, logOutput, "msg=TLSHandshakeDone") +}