Skip to content

Commit a80f90b

Browse files
committed
Support remote write v2 by converting request
Signed-off-by: SungJin1212 <[email protected]>
1 parent 0e85ae0 commit a80f90b

File tree

18 files changed

+1361
-115
lines changed

18 files changed

+1361
-115
lines changed

.github/workflows/test-build-deploy.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ jobs:
162162
- integration_querier
163163
- integration_ruler
164164
- integration_query_fuzz
165+
- integration_remote_write_v2
165166
steps:
166167
- name: Upgrade golang
167168
uses: actions/setup-go@0aaccfd150d50ccaeb58ebd88d36e91967a5f35b # v5.4.0

.golangci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,4 @@ run:
5050
- integration_querier
5151
- integration_ruler
5252
- integration_query_fuzz
53+
- integration_remote_write_v2

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
77
* [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590
88
* [FEATURE] Ingester/StoreGateway: Add `ResourceMonitor` module in Cortex, and add `ResourceBasedLimiter` in Ingesters and StoreGateways. #6674
9+
* [FEATURE] Support Prometheus remote write 2.0. #6330
910
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
1011
* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680
1112
* [FEATURE] Ruler: Add support for group labels. #6665

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2694,6 +2694,11 @@ ha_tracker:
26942694
# CLI flag: -distributor.sign-write-requests
26952695
[sign_write_requests: <boolean> | default = false]
26962696
2697+
# EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push
2698+
# request.
2699+
# CLI flag: -distributor.remote-write2-enabled
2700+
[remote_write2_enabled: <boolean> | default = false]
2701+
26972702
ring:
26982703
kvstore:
26992704
# Backend storage to use for the ring. Supported values are: consul, etcd,

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ Currently experimental features are:
5959
- Distributor:
6060
- Do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`)
6161
- Accept multiple HA pairs in the same request (enabled via `-experimental.distributor.ha-tracker.mixed-ha-samples=true`)
62+
- Accept Prometheus remote write 2.0 request (`-distributor.remote-write2-enabled=true`)
6263
- Tenant Deletion in Purger, for blocks storage.
6364
- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`)
6465
- Blocks storage bucket index

integration/e2e/util.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/prometheus/prometheus/model/histogram"
2020
"github.com/prometheus/prometheus/model/labels"
2121
"github.com/prometheus/prometheus/prompb"
22+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2223
"github.com/prometheus/prometheus/storage"
2324
"github.com/prometheus/prometheus/tsdb"
2425
"github.com/prometheus/prometheus/tsdb/tsdbutil"
@@ -423,3 +424,78 @@ func CreateBlock(
423424

424425
return id, nil
425426
}
427+
428+
func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
429+
tsMillis := TimeToMilliseconds(ts)
430+
431+
st := writev2.NewSymbolTable()
432+
433+
lbs := labels.Labels{labels.Label{Name: "__name__", Value: name}}
434+
for _, lbl := range additionalLabels {
435+
lbs = append(lbs, labels.Label{Name: lbl.Name, Value: lbl.Value})
436+
}
437+
438+
var (
439+
h *histogram.Histogram
440+
fh *histogram.FloatHistogram
441+
ph writev2.Histogram
442+
)
443+
if floatHistogram {
444+
fh = tsdbutil.GenerateTestFloatHistogram(int64(i))
445+
ph = writev2.FromFloatHistogram(tsMillis, fh)
446+
} else {
447+
h = tsdbutil.GenerateTestHistogram(int64(i))
448+
ph = writev2.FromIntHistogram(tsMillis, h)
449+
}
450+
451+
// Generate the series
452+
series = append(series, writev2.TimeSeries{
453+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
454+
Histograms: []writev2.Histogram{ph},
455+
})
456+
457+
symbols = st.Symbols()
458+
459+
return
460+
}
461+
462+
func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
463+
tsMillis := TimeToMilliseconds(ts)
464+
value := rand.Float64()
465+
466+
st := writev2.NewSymbolTable()
467+
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}
468+
469+
for _, label := range additionalLabels {
470+
lbs = append(lbs, labels.Label{
471+
Name: label.Name,
472+
Value: label.Value,
473+
})
474+
}
475+
series = append(series, writev2.TimeSeries{
476+
// Generate the series
477+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
478+
Samples: []writev2.Sample{
479+
{Value: value, Timestamp: tsMillis},
480+
},
481+
Metadata: writev2.Metadata{
482+
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
483+
},
484+
})
485+
symbols = st.Symbols()
486+
487+
// Generate the expected vector when querying it
488+
metric := model.Metric{}
489+
metric[labels.MetricName] = model.LabelValue(name)
490+
for _, lbl := range additionalLabels {
491+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
492+
}
493+
494+
vector = append(vector, &model.Sample{
495+
Metric: metric,
496+
Value: model.SampleValue(value),
497+
Timestamp: model.Time(tsMillis),
498+
})
499+
500+
return
501+
}

integration/e2ecortex/client.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/prometheus/prometheus/model/labels"
2525
"github.com/prometheus/prometheus/model/rulefmt"
2626
"github.com/prometheus/prometheus/prompb"
27+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2728
"github.com/prometheus/prometheus/storage"
2829
"github.com/prometheus/prometheus/storage/remote"
2930
yaml "gopkg.in/yaml.v3"
@@ -147,6 +148,39 @@ func (c *Client) Push(timeseries []prompb.TimeSeries, metadata ...prompb.MetricM
147148
return res, nil
148149
}
149150

151+
// PushV2 the input timeseries to the remote endpoint
152+
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
153+
// Create write request
154+
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
155+
if err != nil {
156+
return nil, err
157+
}
158+
159+
// Create HTTP request
160+
compressed := snappy.Encode(nil, data)
161+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
162+
if err != nil {
163+
return nil, err
164+
}
165+
166+
req.Header.Add("Content-Encoding", "snappy")
167+
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
168+
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
169+
req.Header.Set("X-Scope-OrgID", c.orgID)
170+
171+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
172+
defer cancel()
173+
174+
// Execute HTTP request
175+
res, err := c.httpClient.Do(req.WithContext(ctx))
176+
if err != nil {
177+
return nil, err
178+
}
179+
180+
defer res.Body.Close()
181+
return res, nil
182+
}
183+
150184
func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
151185
var metricName string
152186
attributes := make(map[string]any)

0 commit comments

Comments
 (0)