Skip to content

Commit 0f501fb

Browse files
committed
tmp
1 parent caf7ac2 commit 0f501fb

25 files changed

+1069
-430
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/flowlogs-pipeline
22
/confgenerator
3+
/k8s-cache
34
/bin/
45
cover.out

Makefile

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,17 @@ IMAGE_TAG_BASE ?= quay.io/$(IMAGE_ORG)/flowlogs-pipeline
3939

4040
# Image URL to use all building/pushing image targets
4141
IMAGE ?= $(IMAGE_TAG_BASE):$(VERSION)
42+
IMAGE_CACHE ?= $(IMAGE_TAG_BASE)-cache:$(VERSION)
4243
OCI_BUILD_OPTS ?=
4344

4445
# Image building tool (docker / podman) - docker is preferred in CI
4546
OCI_BIN_PATH = $(shell which docker 2>/dev/null || which podman)
46-
OCI_BIN ?= $(shell basename ${OCI_BIN_PATH})
47+
OCI_BIN ?= $(shell basename ${OCI_BIN_PATH} 2>/dev/null)
4748

4849
MIN_GO_VERSION := 1.20.0
4950
FLP_BIN_FILE=flowlogs-pipeline
5051
CG_BIN_FILE=confgenerator
52+
K8S_CACHE_BIN_FILE=k8s-cache
5153
NETFLOW_GENERATOR=nflow-generator
5254
CMD_DIR=./cmd/
5355
FLP_CONF_FILE ?= contrib/kubernetes/flowlogs-pipeline.conf.yaml
@@ -61,18 +63,21 @@ FORCE: ;
6163
define build_target
6264
echo 'building image for arch $(1)'; \
6365
DOCKER_BUILDKIT=1 $(OCI_BIN) buildx build --load --build-arg TARGETPLATFORM=linux/$(1) --build-arg TARGETARCH=$(1) --build-arg BUILDPLATFORM=linux/amd64 ${OCI_BUILD_OPTS} -t ${IMAGE}-$(1) -f contrib/docker/Dockerfile .;
66+
DOCKER_BUILDKIT=1 $(OCI_BIN) buildx build --load --build-arg TARGETPLATFORM=linux/$(1) --build-arg TARGETARCH=$(1) --build-arg BUILDPLATFORM=linux/amd64 ${OCI_BUILD_OPTS} -t ${IMAGE_CACHE}-$(1) -f contrib/docker/cache.Dockerfile .;
6467
endef
6568

6669
# push a single arch target image
6770
define push_target
6871
echo 'pushing image ${IMAGE}-$(1)'; \
6972
DOCKER_BUILDKIT=1 $(OCI_BIN) push ${IMAGE}-$(1);
73+
DOCKER_BUILDKIT=1 $(OCI_BIN) push ${IMAGE_CACHE}-$(1);
7074
endef
7175

7276
# manifest create a single arch target provided as argument
7377
define manifest_add_target
7478
echo 'manifest add target $(1)'; \
7579
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest add ${IMAGE} ${IMAGE}-$(1);
80+
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest add ${IMAGE_CACHE} ${IMAGE_CACHE}-$(1);
7681
endef
7782

7883
##@ General
@@ -114,8 +119,12 @@ build_code:
114119
GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${FLP_BIN_FILE}"
115120
GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${CG_BIN_FILE}"
116121

122+
.PHONY: build_k8s_cache
123+
build_k8s_cache:
124+
GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${K8S_CACHE_BIN_FILE}"
125+
117126
.PHONY: build
118-
build: validate_go lint build_code docs ## Build flowlogs-pipeline executable and update the docs
127+
build: validate_go lint build_code build_k8s_cache docs ## Build flowlogs-pipeline executables and update the docs
119128

120129
.PHONY: docs
121130
docs: FORCE ## Update flowlogs-pipeline documentation
@@ -187,16 +196,20 @@ image-push: ## Push MULTIARCH_TARGETS images
187196
.PHONY: manifest-build
188197
manifest-build: ## Build MULTIARCH_TARGETS manifest
189198
@echo 'building manifest $(IMAGE)'
190-
DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE} -f
199+
DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE} -f || true
200+
DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE_CACHE} -f || true
191201
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest create ${IMAGE} $(foreach target,$(MULTIARCH_TARGETS), --amend ${IMAGE}-$(target));
202+
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest create ${IMAGE_CACHE} $(foreach target,$(MULTIARCH_TARGETS), --amend ${IMAGE_CACHE}-$(target));
192203

193204
.PHONY: manifest-push
194205
manifest-push: ## Push MULTIARCH_TARGETS manifest
195206
@echo 'publish manifest $(IMAGE)'
196207
ifeq (${OCI_BIN}, docker)
197208
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE};
209+
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE_CACHE};
198210
else
199211
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE} docker://${IMAGE};
212+
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE_CACHE} docker://${IMAGE_CACHE};
200213
endif
201214

202215
include .mk/development.mk

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -924,7 +924,7 @@ General
924924
925925
Develop
926926
lint Lint the code
927-
build Build flowlogs-pipeline executable and update the docs
927+
build Build flowlogs-pipeline executables and update the docs
928928
docs Update flowlogs-pipeline documentation
929929
clean Clean
930930
tests-unit Unit tests

cmd/k8s-cache/main.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"os"
7+
8+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
9+
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
10+
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
11+
"github.com/sirupsen/logrus"
12+
"gopkg.in/yaml.v2"
13+
)
14+
15+
var (
16+
buildVersion = "unknown"
17+
buildDate = "unknown"
18+
app = "flp-cache"
19+
configPath = flag.String("config", "", "path to a config file")
20+
versionFlag = flag.Bool("v", false, "print version")
21+
log = logrus.WithField("module", "main")
22+
)
23+
24+
type Config struct {
25+
KubeConfigPath string `yaml:"kubeConfigPath"`
26+
KafkaConfig api.EncodeKafka `yaml:"kafkaConfig"`
27+
PProfPort int32 `yaml:"pprofPort"` // TODO: manage pprof
28+
LogLevel string `yaml:"logLevel"`
29+
}
30+
31+
func main() {
32+
flag.Parse()
33+
34+
appVersion := fmt.Sprintf("%s [build version: %s, build date: %s]", app, buildVersion, buildDate)
35+
if *versionFlag {
36+
fmt.Println(appVersion)
37+
os.Exit(0)
38+
}
39+
40+
cfg, err := readConfig(*configPath)
41+
if err != nil {
42+
log.WithError(err).Fatal("error reading config file")
43+
}
44+
45+
lvl, err := logrus.ParseLevel(cfg.LogLevel)
46+
if err != nil {
47+
log.Errorf("Log level %s not recognized, using info", cfg.LogLevel)
48+
lvl = logrus.InfoLevel
49+
}
50+
logrus.SetLevel(lvl)
51+
log.Infof("Starting %s at log level %s", appVersion, lvl)
52+
log.Infof("Configuration: %#v", cfg)
53+
54+
err = kubernetes.InitInformerDatasource(cfg.KubeConfigPath, &cfg.KafkaConfig)
55+
if err != nil {
56+
log.WithError(err).Fatal("error initializing Kubernetes & informers")
57+
}
58+
59+
stopCh := utils.SetupElegantExit()
60+
<-stopCh
61+
}
62+
63+
func readConfig(path string) (*Config, error) {
64+
var cfg Config
65+
if len(path) == 0 {
66+
return &cfg, nil
67+
}
68+
yamlFile, err := os.ReadFile(path)
69+
if err != nil {
70+
return nil, err
71+
}
72+
err = yaml.Unmarshal(yamlFile, &cfg)
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
return &cfg, err
78+
}

cmd/k8s-cache/main_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (C) 2021 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package main
19+
20+
import (
21+
"encoding/json"
22+
"errors"
23+
"os"
24+
"os/exec"
25+
"testing"
26+
27+
"github.com/stretchr/testify/require"
28+
29+
"github.com/netobserv/flowlogs-pipeline/pkg/config"
30+
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
31+
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
32+
)
33+
34+
func TestTheMain(t *testing.T) {
35+
if os.Getenv("BE_CRASHER") == "1" {
36+
main()
37+
return
38+
}
39+
cmd := exec.Command(os.Args[0], "-test.run=TestTheMain")
40+
cmd.Env = append(os.Environ(), "BE_CRASHER=1")
41+
err := cmd.Run()
42+
var castErr *exec.ExitError
43+
if errors.As(err, &castErr) && !castErr.Success() {
44+
return
45+
}
46+
t.Fatalf("process ran with err %v, want exit status 1", err)
47+
}
48+
49+
func TestPipelineConfigSetup(t *testing.T) {
50+
// Kube init mock
51+
kubernetes.MockInformers()
52+
53+
js := `{
54+
"PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]",
55+
"Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"input\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]",
56+
"Health": {
57+
"Port": "8080"
58+
},
59+
"Profile": {
60+
"Port": 0
61+
}
62+
}`
63+
var opts config.Options
64+
err := json.Unmarshal([]byte(js), &opts)
65+
require.NoError(t, err)
66+
cfg, err := config.ParseConfig(&opts)
67+
require.NoError(t, err)
68+
require.NotNil(t, cfg)
69+
mainPipeline, err := pipeline.NewPipeline(&cfg)
70+
require.NoError(t, err)
71+
require.NotNil(t, mainPipeline)
72+
}

contrib/docker/cache.Dockerfile

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# We do not use --platform feature to auto fill this ARG because of incompatibility between podman and docker
2+
ARG TARGETPLATFORM=linux/amd64
3+
ARG BUILDPLATFORM=linux/amd64
4+
FROM --platform=$BUILDPLATFORM docker.io/library/golang:1.22 as builder
5+
6+
ARG TARGETPLATFORM
7+
ARG TARGETARCH=amd64
8+
WORKDIR /app
9+
10+
# Copy source code
11+
COPY go.mod .
12+
COPY go.sum .
13+
COPY Makefile .
14+
COPY .mk/ .mk/
15+
COPY .bingo/ .bingo/
16+
COPY vendor/ vendor/
17+
COPY .git/ .git/
18+
COPY cmd/ cmd/
19+
COPY pkg/ pkg/
20+
21+
RUN git status --porcelain
22+
RUN GOARCH=$TARGETARCH make build_k8s_cache
23+
24+
# final stage
25+
FROM --platform=$TARGETPLATFORM registry.access.redhat.com/ubi9/ubi-minimal:9.4
26+
27+
COPY --from=builder /app/k8s-cache /app/
28+
29+
ENTRYPOINT ["/app/k8s-cache"]

docs/api.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,32 @@ Following is the supported API format for network transformations:
247247
output: entry output field
248248
protocol: entry protocol field
249249
kubeConfigPath: path to kubeconfig file (optional)
250+
kafkaCacheConfig: Kafka config for informers cache (optional)
251+
brokers: list of kafka broker addresses
252+
topic: kafka topic to listen on
253+
groupid: separate groupid for each consumer on specified topic
254+
groupBalancers: list of balancing strategies (range, roundRobin, rackAffinity)
255+
startOffset: FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition
256+
batchReadTimeout: how often (in milliseconds) to process input
257+
decoder: decoder to use (E.g. json or protobuf)
258+
type: (enum) one of the following:
259+
json: JSON decoder
260+
protobuf: Protobuf decoder
261+
batchMaxLen: the number of accumulated flows before being forwarded for processing
262+
pullQueueCapacity: the capacity of the queue use to store pulled flows
263+
pullMaxBytes: the maximum number of bytes being pulled from kafka
264+
commitInterval: the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously.
265+
tls: TLS client configuration (optional)
266+
insecureSkipVerify: skip client verifying the server's certificate chain and host name
267+
caCertPath: path to the CA certificate
268+
userCertPath: path to the user certificate
269+
userKeyPath: path to the user private key
270+
sasl: SASL configuration (optional)
271+
type: SASL type
272+
plain: Plain SASL
273+
scramSHA512: SCRAM/SHA512 SASL
274+
clientIDPath: path to the client ID / SASL username
275+
clientSecretPath: path to the client secret / SASL password
250276
servicesFile: path to services file (optional, default: /etc/services)
251277
protocolsFile: path to protocols file (optional, default: /etc/protocols)
252278
subnetLabels: configure subnet and IPs custom labels

pkg/api/transform_network.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
package api
1919

2020
type TransformNetwork struct {
21-
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
22-
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
23-
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
24-
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
25-
SubnetLabels []NetworkTransformSubnetLabel `yaml:"subnetLabels,omitempty" json:"subnetLabels,omitempty" doc:"configure subnet and IPs custom labels"`
26-
DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
21+
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
22+
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
23+
KafkaCacheConfig *IngestKafka `yaml:"kafkaCacheConfig,omitempty" json:"kafkaCacheConfig,omitempty" doc:"Kafka config for informers cache (optional)"`
24+
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
25+
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
26+
SubnetLabels []NetworkTransformSubnetLabel `yaml:"subnetLabels,omitempty" json:"subnetLabels,omitempty" doc:"configure subnet and IPs custom labels"`
27+
DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
2728
}
2829

2930
func (tn *TransformNetwork) GetServiceFiles() (string, string) {

0 commit comments

Comments
 (0)