Skip to content

Commit 43351c5

Browse files
committed
Update scheduler query validation with unified query converter
1 parent ade3ece commit 43351c5

File tree

3 files changed

+218
-4
lines changed

3 files changed

+218
-4
lines changed

service/frontend/workflow_handler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4081,8 +4081,10 @@ func (wh *WorkflowHandler) ListSchedules(
40814081
}
40824082
if err := scheduler.ValidateVisibilityQuery(
40834083
namespaceName,
4084+
namespaceID,
40844085
saNameType,
40854086
wh.saMapperProvider,
4087+
wh.config.VisibilityEnableUnifiedQueryConverter,
40864088
request.Query,
40874089
); err != nil {
40884090
return nil, err

service/worker/scheduler/query.go

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66

77
"go.temporal.io/api/serviceerror"
8+
"go.temporal.io/server/common/dynamicconfig"
89
"go.temporal.io/server/common/namespace"
910
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch"
1011
"go.temporal.io/server/common/persistence/visibility/store/query"
@@ -37,13 +38,44 @@ func newFieldNameAggInterceptor(
3738
}
3839
}
3940

41+
type saAggInterceptor struct {
42+
names map[string]bool
43+
}
44+
45+
var _ query.SearchAttributeInterceptor = (*saAggInterceptor)(nil)
46+
47+
func newSaAggInterceptor() *saAggInterceptor {
48+
return &saAggInterceptor{
49+
names: make(map[string]bool),
50+
}
51+
}
52+
53+
func (i *saAggInterceptor) Intercept(col *query.SAColName) error {
54+
i.names[col.Alias] = true
55+
return nil
56+
}
57+
4058
func ValidateVisibilityQuery(
4159
namespaceName namespace.Name,
60+
namespaceID namespace.ID,
4261
saNameType searchattribute.NameTypeMap,
4362
saMapperProvider searchattribute.MapperProvider,
63+
enableUnifiedQueryConverter dynamicconfig.BoolPropertyFn,
4464
queryString string,
4565
) error {
46-
fields, err := getQueryFields(namespaceName, saNameType, saMapperProvider, queryString)
66+
var fields []string
67+
var err error
68+
if enableUnifiedQueryConverter() {
69+
fields, err = getQueryFields(
70+
namespaceName,
71+
namespaceID,
72+
saNameType,
73+
saMapperProvider,
74+
queryString,
75+
)
76+
} else {
77+
fields, err = getQueryFieldsLegacy(namespaceName, saNameType, saMapperProvider, queryString)
78+
}
4779
if err != nil {
4880
return err
4981
}
@@ -58,6 +90,35 @@ func ValidateVisibilityQuery(
5890
}
5991

6092
func getQueryFields(
93+
namespaceName namespace.Name,
94+
namespaceID namespace.ID,
95+
saNameType searchattribute.NameTypeMap,
96+
saMapperProvider searchattribute.MapperProvider,
97+
queryString string,
98+
) ([]string, error) {
99+
saMapper, err := saMapperProvider.GetMapper(namespaceName)
100+
if err != nil {
101+
return nil, err
102+
}
103+
saInterceptor := newSaAggInterceptor()
104+
queryConverter := query.NewNilQueryConverter(
105+
namespaceName,
106+
namespaceID,
107+
saNameType,
108+
saMapper,
109+
).WithSearchAttributeInterceptor(saInterceptor)
110+
_, err = queryConverter.Convert(queryString)
111+
if err != nil {
112+
var converterErr *query.ConverterError
113+
if errors.As(err, &converterErr) {
114+
return nil, converterErr.ToInvalidArgument()
115+
}
116+
return nil, err
117+
}
118+
return expmaps.Keys(saInterceptor.names), nil
119+
}
120+
121+
func getQueryFieldsLegacy(
61122
namespaceName namespace.Name,
62123
saNameType searchattribute.NameTypeMap,
63124
saMapperProvider searchattribute.MapperProvider,

service/worker/scheduler/query_test.go

Lines changed: 154 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@ import (
44
"testing"
55

66
"github.com/stretchr/testify/require"
7+
enumspb "go.temporal.io/api/enums/v1"
78
"go.temporal.io/api/serviceerror"
9+
"go.temporal.io/server/common/dynamicconfig"
810
"go.temporal.io/server/common/namespace"
911
"go.temporal.io/server/common/persistence/visibility/store/query"
1012
"go.temporal.io/server/common/searchattribute"
1113
)
1214

1315
const (
14-
testNamespace = namespace.Name("test-namespace")
16+
testNamespace = namespace.Name("test-namespace")
17+
testNamespaceID = namespace.ID("test-namespace-id")
1518
)
1619

1720
func TestFieldNameAggInterceptor(t *testing.T) {
@@ -38,7 +41,62 @@ func TestFieldNameAggInterceptor(t *testing.T) {
3841
s.Error(err)
3942
}
4043

41-
func TestGetQueryFields(t *testing.T) {
44+
func TestFieldSaAggInterceptor(t *testing.T) {
45+
s := require.New(t)
46+
saInterceptor := newSaAggInterceptor()
47+
intCol := query.NewSAColName(
48+
"AliasForCustomIntField",
49+
"CustomIntField",
50+
enumspb.INDEXED_VALUE_TYPE_INT,
51+
)
52+
keywordCol := query.NewSAColName(
53+
"AliasForCustomKeywordField",
54+
"CustomKeywordField",
55+
enumspb.INDEXED_VALUE_TYPE_KEYWORD,
56+
)
57+
startTimeCol := query.NewSAColName(
58+
searchattribute.StartTime,
59+
searchattribute.StartTime,
60+
enumspb.INDEXED_VALUE_TYPE_DATETIME,
61+
)
62+
63+
err := saInterceptor.Intercept(intCol)
64+
s.NoError(err)
65+
s.Equal(map[string]bool{"AliasForCustomIntField": true}, saInterceptor.names)
66+
67+
err = saInterceptor.Intercept(keywordCol)
68+
s.NoError(err)
69+
s.Equal(
70+
map[string]bool{
71+
"AliasForCustomIntField": true,
72+
"AliasForCustomKeywordField": true,
73+
},
74+
saInterceptor.names,
75+
)
76+
77+
err = saInterceptor.Intercept(intCol)
78+
s.NoError(err)
79+
s.Equal(
80+
map[string]bool{
81+
"AliasForCustomIntField": true,
82+
"AliasForCustomKeywordField": true,
83+
},
84+
saInterceptor.names,
85+
)
86+
87+
err = saInterceptor.Intercept(startTimeCol)
88+
s.NoError(err)
89+
s.Equal(
90+
map[string]bool{
91+
"AliasForCustomIntField": true,
92+
"AliasForCustomKeywordField": true,
93+
searchattribute.StartTime: true,
94+
},
95+
saInterceptor.names,
96+
)
97+
}
98+
99+
func TestGetQueryFieldsLegacy(t *testing.T) {
42100
testCases := []struct {
43101
name string
44102
input string
@@ -101,13 +159,104 @@ func TestGetQueryFields(t *testing.T) {
101159
},
102160
}
103161

162+
for _, tc := range testCases {
163+
t.Run(
164+
tc.name,
165+
func(t *testing.T) {
166+
s := require.New(t)
167+
fields, err := getQueryFieldsLegacy(
168+
testNamespace,
169+
searchattribute.TestNameTypeMap,
170+
searchattribute.NewTestMapperProvider(nil),
171+
tc.input,
172+
)
173+
if tc.expectedErrMsg == "" {
174+
s.NoError(err)
175+
s.Equal(len(tc.expectedFields), len(fields))
176+
for _, f := range fields {
177+
s.Contains(tc.expectedFields, f)
178+
}
179+
} else {
180+
var invalidArgErr *serviceerror.InvalidArgument
181+
s.ErrorAs(err, &invalidArgErr)
182+
s.ErrorContains(err, tc.expectedErrMsg)
183+
}
184+
},
185+
)
186+
}
187+
}
188+
189+
func TestGetQueryFields(t *testing.T) {
190+
testCases := []struct {
191+
name string
192+
input string
193+
expectedFields []string
194+
expectedErrMsg string
195+
}{
196+
{
197+
name: "empty query string",
198+
input: "",
199+
expectedFields: []string{},
200+
expectedErrMsg: "",
201+
},
202+
{
203+
name: "filter custom search attribute",
204+
input: "CustomKeywordField = 'foo'",
205+
expectedFields: []string{"CustomKeywordField"},
206+
expectedErrMsg: "",
207+
},
208+
{
209+
name: "filter multiple custom search attribute",
210+
input: "(CustomKeywordField = 'foo' AND CustomIntField = 123) OR CustomKeywordField = 'bar'",
211+
expectedFields: []string{"CustomKeywordField", "CustomIntField"},
212+
expectedErrMsg: "",
213+
},
214+
{
215+
name: "filter TemporalSchedulePaused",
216+
input: "TemporalSchedulePaused = true",
217+
expectedFields: []string{"TemporalSchedulePaused"},
218+
expectedErrMsg: "",
219+
},
220+
{
221+
name: "filter TemporalSchedulePaused",
222+
input: "TemporalSchedulePaused = true",
223+
expectedFields: []string{"TemporalSchedulePaused"},
224+
expectedErrMsg: "",
225+
},
226+
{
227+
name: "filter TemporalSchedulePaused and custom search attribute",
228+
input: "TemporalSchedulePaused = true AND CustomKeywordField = 'foo'",
229+
expectedFields: []string{"TemporalSchedulePaused", "CustomKeywordField"},
230+
expectedErrMsg: "",
231+
},
232+
{
233+
name: "filter system search attribute",
234+
input: "ExecutionDuration > '1s'",
235+
expectedFields: []string{"ExecutionDuration"},
236+
expectedErrMsg: "",
237+
},
238+
{
239+
name: "invalid query filter",
240+
input: "CustomKeywordField = foo",
241+
expectedFields: nil,
242+
expectedErrMsg: "invalid query",
243+
},
244+
{
245+
name: "invalid custom search attribute",
246+
input: "Foo = 'bar'",
247+
expectedFields: nil,
248+
expectedErrMsg: "'Foo' is not a valid search attribute",
249+
},
250+
}
251+
104252
for _, tc := range testCases {
105253
t.Run(
106254
tc.name,
107255
func(t *testing.T) {
108256
s := require.New(t)
109257
fields, err := getQueryFields(
110258
testNamespace,
259+
testNamespaceID,
111260
searchattribute.TestNameTypeMap,
112261
searchattribute.NewTestMapperProvider(nil),
113262
tc.input,
@@ -177,7 +326,7 @@ func TestValidateVisibilityQuery(t *testing.T) {
177326
{
178327
name: "invalid custom search attribute",
179328
input: "Foo = foo",
180-
expectedErrMsg: "invalid search attribute: Foo",
329+
expectedErrMsg: "'Foo' is not a valid search attribute",
181330
},
182331
}
183332

@@ -188,8 +337,10 @@ func TestValidateVisibilityQuery(t *testing.T) {
188337
s := require.New(t)
189338
err := ValidateVisibilityQuery(
190339
testNamespace,
340+
testNamespaceID,
191341
searchattribute.TestNameTypeMap,
192342
searchattribute.NewTestMapperProvider(nil),
343+
dynamicconfig.GetBoolPropertyFn(true),
193344
tc.input,
194345
)
195346
if tc.expectedErrMsg == "" {

0 commit comments

Comments
 (0)