Skip to content

Commit 29c8497

Browse files
change config to specify timestamp/cursor per query
1 parent 89e6c45 commit 29c8497

File tree

2 files changed

+40
-21
lines changed

2 files changed

+40
-21
lines changed

x-pack/filebeat/input/gcpbigquery/config.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,31 +14,36 @@ type config struct {
1414
// The ID of the GCP project that owns the BigQuery dataset.
1515
ProjectID string `config:"project_id" validate:"required"`
1616

17-
// The BigQuery SQL queries to execute.
18-
Queries []string `config:"queries" validate:"required"`
17+
// JSON file containing authentication credentials and key.
18+
CredentialsFile string `config:"credentials_file"`
19+
20+
// The BigQuery queries to execute, along with options for each.
21+
Queries []queryConfig `config:"queries" validate:"required"`
22+
}
23+
24+
type queryConfig struct {
25+
// The SQL query to execute.
26+
Query string `config:"query" validate:"required"`
1927

2028
// The name of a field in the target BigQuery table that can be used to simulate cursor pagination, e.g. an incremental ID or timestamp.
2129
// The following field types are supported: BIGNUMERIC, BYTES, DATE, DATETIME, FLOAT, INTEGER, NUMERIC, STRING, TIME, TIMESTAMP.
2230
// If not specified, the input will run the configured queries as-is on every poll. If specified, the input will add a WHERE clause
2331
// to each query to only select rows where the cursor field's value is greater than the last seen value.
2432
CursorField string `config:"cursor_field"`
2533

26-
// JSON file containing authentication credentials and key.
27-
CredentialsFile string `config:"credentials_file"`
34+
// A TIMESTAMP field in the target BigQuery table to use as the event's @timestamp value.
35+
TimestampField string `config:"timestamp_field"`
2836

2937
// Whether to attempt to parse fields of type JSON into objects/arrays instead of leaving them as strings.
3038
// In the event of parsing failures, we still expand the field into a JSON object with a single field named
3139
// "original" containing the original string value; this avoids mapping conflicts in Elasticsearch.
32-
ExpandJsonStrings bool `config:"expand_json_strings"`
33-
34-
// A TIMESTAMP field in the target BigQuery table to use as the event's @timestamp value.
35-
TimestampField string `config:"timestamp_field"`
40+
// Defaults to true.
41+
ExpandJsonStrings *bool `config:"expand_json_strings"`
3642
}
3743

3844
func defaultConfig() config {
3945
return config{
40-
Period: time.Minute,
41-
ExpandJsonStrings: true,
46+
Period: time.Minute,
4247
}
4348
}
4449

x-pack/filebeat/input/gcpbigquery/input.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"cloud.google.com/go/bigquery"
9+
"github.com/cespare/xxhash/v2"
910
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
1011
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
1112
"github.com/elastic/beats/v7/libbeat/beat"
@@ -45,10 +46,18 @@ func configure(cfg *conf.C, logger *logp.Logger) ([]cursor.Source, cursor.Input,
4546

4647
var sources []cursor.Source
4748
for _, query := range config.Queries {
49+
// defaults to true
50+
expandJSON := true
51+
if query.ExpandJsonStrings != nil {
52+
expandJSON = *query.ExpandJsonStrings
53+
}
54+
4855
sources = append(sources, &bigQuerySource{
49-
ProjectID: config.ProjectID,
50-
Query: query,
51-
CursorField: config.CursorField,
56+
ProjectID: config.ProjectID,
57+
Query: query.Query,
58+
CursorField: query.CursorField,
59+
TimestampField: query.TimestampField,
60+
ExpandJson: expandJSON,
5261
})
5362
}
5463

@@ -63,13 +72,19 @@ func updateStatus(ctx v2.Context, status status.Status, msg string) {
6372

6473
// bigQuerySource defines the configuration for a single BigQuery query.
6574
type bigQuerySource struct {
66-
ProjectID string
67-
Query string
68-
CursorField string
75+
ProjectID string
76+
Query string
77+
CursorField string
78+
TimestampField string
79+
ExpandJson bool
6980
}
7081

7182
func (s *bigQuerySource) Name() string {
72-
return fmt.Sprintf("%s-%s-%s", s.ProjectID, s.Query, s.CursorField)
83+
// this string uniquely identifies the source in the state store.
84+
// configuration that doesn't affect the query/cursor itself should not be included.
85+
name := fmt.Sprintf("%s-%s-%s", s.ProjectID, s.Query, s.CursorField)
86+
// hash it to avoid unintentionally leaching queries into logs/files
87+
return fmt.Sprintf("%d", xxhash.Sum64String(name))
7388
}
7489

7590
type bigQueryInput struct {
@@ -182,7 +197,7 @@ func (bq *bigQueryInput) publishEvent(src *bigQuerySource, publisher cursor.Publ
182197
}
183198
}
184199

185-
if bq.config.TimestampField != "" && field.Name == bq.config.TimestampField {
200+
if src.TimestampField != "" && field.Name == src.TimestampField {
186201
bq.logger.Debugf("setting timestamp from field %s", field.Name)
187202

188203
ts, err := getTimestamp(field, v)
@@ -193,11 +208,10 @@ func (bq *bigQueryInput) publishEvent(src *bigQuerySource, publisher cursor.Publ
193208
}
194209
}
195210

196-
if bq.config.ExpandJsonStrings {
197-
bq.logger.Debugf("expanding JSON from field %s", field.Name)
198-
211+
if src.ExpandJson {
199212
val, err := expandJSON(field, v)
200213
if err == nil {
214+
bq.logger.Debugf("expanding JSON from field %s", field.Name)
201215
v = val
202216
} else {
203217
// on error, still expand into a nested object with the original string to avoid mapping conflicts

0 commit comments

Comments
 (0)