Skip to content

Commit 61129ea

Browse files
move the helper to pyroscope package
1 parent 75cfd9a commit 61129ea

File tree

6 files changed

+78
-75
lines changed

6 files changed

+78
-75
lines changed

internal/cmd/integration-tests/common/profiles_assert.go

Lines changed: 0 additions & 52 deletions
This file was deleted.
Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,57 @@
11
package main
22

33
import (
4+
"strings"
45
"testing"
6+
"time"
57

68
"github.com/grafana/alloy/internal/cmd/integration-tests/common"
9+
pyroutil "github.com/grafana/alloy/internal/component/pyroscope/util/test"
10+
querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
11+
"github.com/stretchr/testify/require"
712
)
813

914
func TestPyroscopeJavaKafka(t *testing.T) {
10-
q := `{service_name="integration-test/java/kafka"}`
11-
common.AssertPyroscopeProfile(
12-
t,
13-
`process_cpu:cpu:nanoseconds:cpu:nanoseconds`,
14-
q,
15-
[]string{
16-
`kafka/server/KafkaRequestHandler.run`,
17-
`libjvm.so.JavaThread::thread_main_inner`,
18-
},
19-
)
20-
common.AssertPyroscopeProfile(
21-
t,
22-
"memory:alloc_in_new_tlab_bytes:bytes:space:bytes",
23-
q,
24-
[]string{
25-
`kafka/server/KafkaRequestHandler.run`,
26-
},
27-
)
15+
16+
require.Eventually(t, func() bool {
17+
req := &querierv1.SelectMergeProfileRequest{
18+
ProfileTypeID: `process_cpu:cpu:nanoseconds:cpu:nanoseconds`,
19+
LabelSelector: `{service_name="integration-test/java/kafka"}`,
20+
Start: time.Now().Add(-time.Hour).UnixMilli(),
21+
End: time.Now().UnixMilli(),
22+
}
23+
res, err := pyroutil.Query("http://localhost:4040", req)
24+
if err != nil {
25+
return false
26+
}
27+
ss := res.String()
28+
if !strings.Contains(ss, `kafka/server/KafkaRequestHandler.run`) {
29+
return false
30+
}
31+
if !strings.Contains(ss, `libjvm.so.JavaThread::thread_main_inner`) {
32+
return false
33+
}
34+
return true
35+
}, common.DefaultTimeout, common.DefaultRetryInterval)
36+
37+
require.Eventually(t, func() bool {
38+
req := &querierv1.SelectMergeProfileRequest{
39+
ProfileTypeID: `memory:alloc_in_new_tlab_bytes:bytes:space:bytes`,
40+
LabelSelector: `{service_name="integration-test/java/kafka"}`,
41+
Start: time.Now().Add(-time.Hour).UnixMilli(),
42+
End: time.Now().UnixMilli(),
43+
}
44+
res, err := pyroutil.Query("http://localhost:4040", req)
45+
if err != nil {
46+
return false
47+
}
48+
ss := res.String()
49+
if !strings.Contains(ss, `kafka/server/KafkaRequestHandler.run`) {
50+
return false
51+
}
52+
if strings.Contains(ss, `libjvm.so.JavaThread::thread_main_inner`) {
53+
return false
54+
}
55+
return true
56+
}, common.DefaultTimeout, common.DefaultRetryInterval)
2857
}

internal/component/pyroscope/receive_http/receive_http.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/go-kit/log"
1515
"github.com/gorilla/mux"
1616
pyroutil "github.com/grafana/alloy/internal/component/pyroscope/util"
17+
"github.com/grafana/alloy/internal/component/pyroscope/util/tracelog"
1718
"github.com/prometheus/client_golang/prometheus"
1819
"github.com/prometheus/prometheus/model/labels"
1920
"go.opentelemetry.io/otel/trace"
@@ -170,7 +171,7 @@ func (c *Component) Push(ctx context.Context, req *connect.Request[pushv1.PushRe
170171

171172
ctx, sp := c.tracer.Start(ctx, "/push.v1.PusherService/Push")
172173
defer sp.End()
173-
l := pyroutil.TraceLog(c.logger, sp)
174+
l := tracelog.TraceLog(c.logger, sp)
174175

175176
var wg sync.WaitGroup
176177
var errs error
@@ -224,7 +225,7 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
224225
ctx, sp := c.tracer.Start(ctx, "/ingest")
225226
defer sp.End()
226227

227-
l := pyroutil.TraceLog(c.logger, sp)
228+
l := tracelog.TraceLog(c.logger, sp)
228229

229230
// Parse labels early
230231
var lbls labels.Labels
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package test
2+
3+
import (
4+
"context"
5+
"net/http"
6+
7+
"connectrpc.com/connect"
8+
"github.com/google/pprof/profile"
9+
querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
10+
"github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect"
11+
)
12+
13+
func Query(url string, q *querierv1.SelectMergeProfileRequest) (*profile.Profile, error) {
14+
client := querierv1connect.NewQuerierServiceClient(http.DefaultClient, url)
15+
res, err := client.SelectMergeProfile(context.Background(), connect.NewRequest(q))
16+
if err != nil {
17+
return nil, err
18+
}
19+
bs, err := res.Msg.MarshalVT()
20+
if err != nil {
21+
return nil, err
22+
}
23+
return profile.ParseData(bs)
24+
}

internal/component/pyroscope/util/log.go renamed to internal/component/pyroscope/util/tracelog/log.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package util
1+
package tracelog
22

33
import (
44
"github.com/go-kit/log"

internal/component/pyroscope/write/write.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/grafana/alloy/internal/component/common/config"
2323
"github.com/grafana/alloy/internal/component/pyroscope"
2424
"github.com/grafana/alloy/internal/component/pyroscope/util"
25+
"github.com/grafana/alloy/internal/component/pyroscope/util/tracelog"
2526
"github.com/grafana/alloy/internal/featuregate"
2627
"github.com/grafana/alloy/internal/useragent"
2728
"github.com/grafana/dskit/backoff"
@@ -260,7 +261,7 @@ func (f *fanOutClient) Push(
260261
dl any
261262
ok bool
262263
reqSize, profileCount = requestSize(req)
263-
l = util.TraceLog(f.logger, sp)
264+
l = tracelog.TraceLog(f.logger, sp)
264265
st = time.Now()
265266
)
266267
if dl, ok = ctx.Deadline(); !ok {
@@ -466,7 +467,7 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco
466467
dl any
467468
ok bool
468469
reqSize, profileCount = int64(len(profile.RawBody)), int64(1)
469-
l = util.TraceLog(f.logger, sp)
470+
l = tracelog.TraceLog(f.logger, sp)
470471
st = time.Now()
471472
)
472473
if dl, ok = ctx.Deadline(); !ok {

0 commit comments

Comments
 (0)