Skip to content

Commit 5bd1dbb

Browse files
Merge pull request #1346 from vimalk78/log-2255-label
LOG-2255: Vector: Support Pod Label based routing
2 parents 733e4b2 + 0e6fa59 commit 5bd1dbb

File tree

6 files changed

+191
-53
lines changed

6 files changed

+191
-53
lines changed

internal/generator/vector/conf_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ source = '''
172172
[transforms.route_container_logs]
173173
type = "route"
174174
inputs = ["container_logs"]
175-
route.app = '!(starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default")'
176-
route.infra = 'starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default"'
175+
route.app = '!((starts_with!(.kubernetes.pod_namespace,"kube")) || (starts_with!(.kubernetes.pod_namespace,"openshift")) || (.kubernetes.pod_namespace == "default"))'
176+
route.infra = '(starts_with!(.kubernetes.pod_namespace,"kube")) || (starts_with!(.kubernetes.pod_namespace,"openshift")) || (.kubernetes.pod_namespace == "default")'
177177
178178
179179
# Rename log stream to "application"
@@ -206,7 +206,7 @@ source = """
206206
[transforms.route_application_logs]
207207
type = "route"
208208
inputs = ["application"]
209-
route.mytestapp = '(.kubernetes.pod_namespace == "test-ns")'
209+
route.mytestapp = '.kubernetes.pod_namespace == "test-ns"'
210210
211211
212212
[transforms.pipeline]
@@ -367,8 +367,8 @@ source = '''
367367
[transforms.route_container_logs]
368368
type = "route"
369369
inputs = ["container_logs"]
370-
route.app = '!(starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default")'
371-
route.infra = 'starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default"'
370+
route.app = '!((starts_with!(.kubernetes.pod_namespace,"kube")) || (starts_with!(.kubernetes.pod_namespace,"openshift")) || (.kubernetes.pod_namespace == "default"))'
371+
route.infra = '(starts_with!(.kubernetes.pod_namespace,"kube")) || (starts_with!(.kubernetes.pod_namespace,"openshift")) || (.kubernetes.pod_namespace == "default")'
372372
373373
374374
# Rename log stream to "application"
@@ -703,8 +703,8 @@ source = '''
703703
[transforms.route_container_logs]
704704
type = "route"
705705
inputs = ["container_logs"]
706-
route.app = '!(starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default")'
707-
route.infra = 'starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default"'
706+
route.app = '!((starts_with!(.kubernetes.pod_namespace,"kube")) || (starts_with!(.kubernetes.pod_namespace,"openshift")) || (.kubernetes.pod_namespace == "default"))'
707+
route.infra = '(starts_with!(.kubernetes.pod_namespace,"kube")) || (starts_with!(.kubernetes.pod_namespace,"openshift")) || (.kubernetes.pod_namespace == "default")'
708708
709709
710710
# Rename log stream to "application"

internal/generator/vector/elements/elements.go

+23
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package elements
22

3+
import (
4+
"github.com/openshift/cluster-logging-operator/internal/generator"
5+
)
6+
37
type Route struct {
48
ComponentID string
59
Desc string
@@ -53,3 +57,22 @@ source = """
5357
{{end}}
5458
`
5559
}
60+
61+
func Debug(id string, inputs string) generator.Element {
62+
return generator.ConfLiteral{
63+
Desc: "Sending records to stdout for debug purposes",
64+
ComponentID: id,
65+
InLabel: inputs,
66+
TemplateName: "debug",
67+
TemplateStr: `
68+
{{define "debug" -}}
69+
[sinks.{{.ComponentID}}]
70+
inputs = {{.InLabel}}
71+
type = "console"
72+
target = "stdout"
73+
[sinks.{{.ComponentID}}.encoding]
74+
codec = "json"
75+
{{end}}
76+
`,
77+
}
78+
}

internal/generator/vector/output/loki/loki.go

+1-17
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66

77
logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
88
"github.com/openshift/cluster-logging-operator/internal/constants"
9-
"github.com/openshift/cluster-logging-operator/internal/generator"
109
. "github.com/openshift/cluster-logging-operator/internal/generator"
1110
genhelper "github.com/openshift/cluster-logging-operator/internal/generator/helpers"
1211
. "github.com/openshift/cluster-logging-operator/internal/generator/vector/elements"
@@ -105,22 +104,7 @@ func (l LokiLabels) Template() string {
105104
func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element {
106105
if genhelper.IsDebugOutput(op) {
107106
return []Element{
108-
generator.ConfLiteral{
109-
Desc: "Sending records to stdout for debug purposes",
110-
ComponentID: strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)),
111-
InLabel: vectorhelpers.MakeInputs(inputs...),
112-
TemplateName: "lokidebug",
113-
TemplateStr: `
114-
{{define "lokidebug" -}}
115-
[sinks.{{.ComponentID}}]
116-
inputs = {{.InLabel}}
117-
type = "console"
118-
target = "stdout"
119-
[sinks.{{.ComponentID}}.encoding]
120-
codec = "json"
121-
{{end}}
122-
`,
123-
},
107+
Debug(strings.ToLower(vectorhelpers.Replacer.Replace(o.Name)), vectorhelpers.MakeInputs(inputs...)),
124108
}
125109
}
126110
return MergeElements(

internal/generator/vector/sources_to_inputs.go

+51-24
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package vector
22

33
import (
44
"fmt"
5-
"strings"
5+
"sort"
66

77
logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
88
"github.com/openshift/cluster-logging-operator/internal/generator"
@@ -11,24 +11,39 @@ import (
1111
)
1212

1313
const (
14-
IsInfraContainer = `starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default"`
15-
IsNamespaceLog = `(.kubernetes.pod_namespace == "%s")`
14+
NsKube = "kube"
15+
NsOpenshift = "openshift"
16+
NsDefault = "default"
17+
18+
K8sPodNamespace = ".kubernetes.pod_namespace"
19+
K8sLabelKeyExpr = ".kubernetes.pod_labels.%s"
20+
21+
InputContainerLogs = "container_logs"
22+
InputJournalLogs = "journal_logs"
23+
RouteApplicationLogs = "route_application_logs"
1624

1725
SrcPassThrough = "."
1826
)
1927

2028
var (
21-
AddLogTypeApp = fmt.Sprintf(".log_type = %q", logging.InputNameApplication)
22-
AddLogTypeInfra = fmt.Sprintf(".log_type = %q", logging.InputNameInfrastructure)
23-
AddLogTypeAudit = fmt.Sprintf(".log_type = %q", logging.InputNameAudit)
24-
InfraContainerLogsExpr = fmt.Sprintf(`'%s'`, IsInfraContainer)
25-
AppContainerLogsExpr = fmt.Sprintf(`'!(%s)'`, IsInfraContainer)
26-
InputContainerLogs = "container_logs"
27-
InputJournalLogs = "journal_logs"
28-
RouteApplicationLogs = "route_application_logs"
29+
InfraContainerLogs = OR(
30+
StartWith(K8sPodNamespace, NsKube),
31+
StartWith(K8sPodNamespace, NsOpenshift),
32+
Eq(K8sPodNamespace, NsDefault))
33+
AppContainerLogs = Neg(Paren(InfraContainerLogs))
34+
35+
AddLogTypeApp = fmt.Sprintf(".log_type = %q", logging.InputNameApplication)
36+
AddLogTypeInfra = fmt.Sprintf(".log_type = %q", logging.InputNameInfrastructure)
37+
AddLogTypeAudit = fmt.Sprintf(".log_type = %q", logging.InputNameAudit)
2938

30-
OR = func(nsExpr ...string) string {
31-
return fmt.Sprintf("'%s'", strings.Join(nsExpr, " || "))
39+
MatchNS = func(ns string) string {
40+
return Eq(K8sPodNamespace, ns)
41+
}
42+
K8sLabelKey = func(k string) string {
43+
return fmt.Sprintf(K8sLabelKeyExpr, k)
44+
}
45+
MatchLabel = func(k, v string) string {
46+
return Eq(K8sLabelKey(k), v)
3247
}
3348
)
3449

@@ -45,19 +60,18 @@ func SourcesToInputs(spec *logging.ClusterLogForwarderSpec, o generator.Options)
4560
Routes: map[string]string{},
4661
}
4762
if types.Has(logging.InputNameApplication) {
48-
r.Routes["app"] = AppContainerLogsExpr
63+
r.Routes["app"] = Quote(AppContainerLogs)
4964
}
5065
if types.Has(logging.InputNameInfrastructure) {
51-
r.Routes["infra"] = InfraContainerLogsExpr
66+
r.Routes["infra"] = Quote(InfraContainerLogs)
5267
}
53-
//TODO Add handling of user-defined inputs
5468
el = append(el, r)
5569
}
5670

5771
if types.Has(logging.InputNameApplication) {
5872
r := Remap{
5973
Desc: `Rename log stream to "application"`,
60-
ComponentID: "application",
74+
ComponentID: logging.InputNameApplication,
6175
Inputs: helpers.MakeInputs("route_container_logs.app"),
6276
VRL: AddLogTypeApp,
6377
}
@@ -66,7 +80,7 @@ func SourcesToInputs(spec *logging.ClusterLogForwarderSpec, o generator.Options)
6680
if types.Has(logging.InputNameInfrastructure) {
6781
r := Remap{
6882
Desc: `Rename log stream to "infrastructure"`,
69-
ComponentID: "infrastructure",
83+
ComponentID: logging.InputNameInfrastructure,
7084
Inputs: helpers.MakeInputs("route_container_logs.infra", InputJournalLogs),
7185
VRL: AddLogTypeInfra,
7286
}
@@ -75,30 +89,43 @@ func SourcesToInputs(spec *logging.ClusterLogForwarderSpec, o generator.Options)
7589
if types.Has(logging.InputNameAudit) {
7690
r := Remap{
7791
Desc: `Rename log stream to "audit"`,
78-
ComponentID: "audit",
92+
ComponentID: logging.InputNameAudit,
7993
Inputs: helpers.MakeInputs("host_audit_logs", "k8s_audit_logs", "openshift_audit_logs"),
8094
VRL: AddLogTypeAudit,
8195
}
8296
el = append(el, r)
8397
}
84-
//TODO add labels based routing
8598
userDefined := spec.InputMap()
8699
for _, pipeline := range spec.Pipelines {
87100
for _, inRef := range pipeline.InputRefs {
88101
if input, ok := userDefined[inRef]; ok {
89102
// user defined input
90103
if input.Application != nil {
91104
app := input.Application
105+
matchNS := []string{}
92106
if len(app.Namespaces) != 0 {
93-
matchNS := []string{}
94107
for _, ns := range app.Namespaces {
95-
matchNS = append(matchNS, fmt.Sprintf(IsNamespaceLog, ns))
108+
matchNS = append(matchNS, MatchNS(ns))
96109
}
110+
}
111+
matchLabels := []string{}
112+
if app.Selector != nil && len(app.Selector.MatchLabels) != 0 {
113+
labels := app.Selector.MatchLabels
114+
keys := make([]string, 0, len(labels))
115+
for k := range labels {
116+
keys = append(keys, k)
117+
}
118+
sort.Strings(keys)
119+
for _, k := range keys {
120+
matchLabels = append(matchLabels, MatchLabel(k, labels[k]))
121+
}
122+
}
123+
if len(matchNS) != 0 || len(matchLabels) != 0 {
97124
el = append(el, Route{
98125
ComponentID: RouteApplicationLogs,
99-
Inputs: helpers.MakeInputs("application"),
126+
Inputs: helpers.MakeInputs(logging.InputNameApplication),
100127
Routes: map[string]string{
101-
input.Name: OR(matchNS...),
128+
input.Name: Quote(AND(OR(matchNS...), AND(matchLabels...))),
102129
},
103130
})
104131
}

internal/generator/vector/sources_to_pipelines_test.go

+60-5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
88
"github.com/openshift/cluster-logging-operator/internal/generator"
99
corev1 "k8s.io/api/core/v1"
10+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1011
)
1112

1213
var _ = Describe("Testing Config Generation", func() {
@@ -35,8 +36,8 @@ var _ = Describe("Testing Config Generation", func() {
3536
[transforms.route_container_logs]
3637
type = "route"
3738
inputs = ["container_logs"]
38-
route.app = '!(starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default")'
39-
route.infra = 'starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default"'
39+
route.app = '!((starts_with!(.kubernetes.pod_namespace,"kube")) || (starts_with!(.kubernetes.pod_namespace,"openshift")) || (.kubernetes.pod_namespace == "default"))'
40+
route.infra = '(starts_with!(.kubernetes.pod_namespace,"kube")) || (starts_with!(.kubernetes.pod_namespace,"openshift")) || (.kubernetes.pod_namespace == "default")'
4041
4142
4243
# Rename log stream to "application"
@@ -99,8 +100,8 @@ source = """
99100
[transforms.route_container_logs]
100101
type = "route"
101102
inputs = ["container_logs"]
102-
route.app = '!(starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default")'
103-
route.infra = 'starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default"'
103+
route.app = '!((starts_with!(.kubernetes.pod_namespace,"kube")) || (starts_with!(.kubernetes.pod_namespace,"openshift")) || (.kubernetes.pod_namespace == "default"))'
104+
route.infra = '(starts_with!(.kubernetes.pod_namespace,"kube")) || (starts_with!(.kubernetes.pod_namespace,"openshift")) || (.kubernetes.pod_namespace == "default")'
104105
105106
106107
# Rename log stream to "application"
@@ -168,7 +169,7 @@ source = """
168169
[transforms.route_container_logs]
169170
type = "route"
170171
inputs = ["container_logs"]
171-
route.app = '!(starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default")'
172+
route.app = '!((starts_with!(.kubernetes.pod_namespace,"kube")) || (starts_with!(.kubernetes.pod_namespace,"openshift")) || (.kubernetes.pod_namespace == "default"))'
172173
173174
174175
# Rename log stream to "application"
@@ -186,6 +187,60 @@ inputs = ["application"]
186187
route.myapplogs = '(.kubernetes.pod_namespace == "test-ns1") || (.kubernetes.pod_namespace == "test-ns2")'
187188
188189
190+
[transforms.pipeline]
191+
type = "remap"
192+
inputs = ["route_application_logs.myapplogs"]
193+
source = """
194+
.
195+
"""
196+
`,
197+
}),
198+
Entry("Route Logs by Namespaces(s), and Labels(s)", generator.ConfGenerateTest{
199+
CLFSpec: logging.ClusterLogForwarderSpec{
200+
Inputs: []logging.InputSpec{
201+
{
202+
Name: "myapplogs",
203+
Application: &logging.Application{
204+
Namespaces: []string{"myapp1", "myapp2"},
205+
Selector: &v1.LabelSelector{
206+
MatchLabels: map[string]string{
207+
"key1": "value1",
208+
"key2": "value2",
209+
},
210+
},
211+
},
212+
},
213+
},
214+
Pipelines: []logging.PipelineSpec{
215+
{
216+
InputRefs: []string{"myapplogs"},
217+
OutputRefs: []string{logging.OutputNameDefault},
218+
Name: "pipeline",
219+
},
220+
},
221+
},
222+
ExpectedConf: `
223+
[transforms.route_container_logs]
224+
type = "route"
225+
inputs = ["container_logs"]
226+
route.app = '!((starts_with!(.kubernetes.pod_namespace,"kube")) || (starts_with!(.kubernetes.pod_namespace,"openshift")) || (.kubernetes.pod_namespace == "default"))'
227+
228+
229+
# Rename log stream to "application"
230+
[transforms.application]
231+
type = "remap"
232+
inputs = ["route_container_logs.app"]
233+
source = """
234+
.log_type = "application"
235+
"""
236+
237+
238+
[transforms.route_application_logs]
239+
type = "route"
240+
inputs = ["application"]
241+
route.myapplogs = '((.kubernetes.pod_namespace == "myapp1") || (.kubernetes.pod_namespace == "myapp2")) && ((.kubernetes.pod_labels.key1 == "value1") && (.kubernetes.pod_labels.key2 == "value2"))'
242+
243+
189244
[transforms.pipeline]
190245
type = "remap"
191246
inputs = ["route_application_logs.myapplogs"]

internal/generator/vector/vrl.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package vector
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
)
7+
8+
var (
9+
SkipEmpty = func(in []string) []string {
10+
vals := []string{}
11+
for _, v := range in {
12+
if strings.TrimSpace(v) != "" {
13+
vals = append(vals, v)
14+
}
15+
}
16+
return vals
17+
}
18+
Paren = func(in string) string {
19+
return fmt.Sprintf("(%s)", in)
20+
}
21+
ParenAll = func(in []string) []string {
22+
if len(in) == 1 {
23+
return in
24+
}
25+
vals := []string{}
26+
for _, v := range in {
27+
vals = append(vals, Paren(v))
28+
}
29+
return vals
30+
}
31+
StartWith = func(x, y string) string {
32+
return fmt.Sprintf("starts_with!(%s,%q)", x, y)
33+
}
34+
Eq = func(x, y string) string {
35+
return fmt.Sprintf("%s == %q", x, y)
36+
}
37+
Quote = func(expr string) string {
38+
return fmt.Sprintf("'%s'", expr)
39+
}
40+
OR = func(nsExpr ...string) string {
41+
return strings.Join(ParenAll(SkipEmpty(nsExpr)), " || ")
42+
}
43+
AND = func(nsExpr ...string) string {
44+
return strings.Join(ParenAll(SkipEmpty(nsExpr)), " && ")
45+
}
46+
Neg = func(expr string) string {
47+
return fmt.Sprintf("!%s", expr)
48+
}
49+
)

0 commit comments

Comments
 (0)