Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
.PHONY: all serverless deps docker docker-cgo clean docs test test-race test-integration fmt lint install deploy-docs playground
.PHONY: all serverless deps docker docker-cgo clean docs test test-race test-integration fmt lint install deploy-docs playground proto install-protoc-plugins

TAGS ?=

GO ?= go
GOPATH_BIN := $(shell $(GO) env GOPATH)/bin
GOMAXPROCS ?= 1
INSTALL_DIR ?= $(GOPATH)/bin
WEBSITE_DIR ?= ./website
Expand All @@ -12,7 +13,7 @@ PATHINSTSERVERLESS = $(DEST_DIR)/serverless
PATHINSTDOCKER = $(DEST_DIR)/docker
DOCKER_IMAGE ?= ghcr.io/warpstreamlabs/bento

VERSION := $(shell git describe --tags || echo "v0.0.0")
VERSION := $(shell git describe --tags 2>/dev/null || echo "v0.0.0")
VER_CUT := $(shell echo $(VERSION) | cut -c2-)
VER_MAJOR := $(shell echo $(VER_CUT) | cut -f1 -d.)
VER_MINOR := $(shell echo $(VER_CUT) | cut -f2 -d.)
Expand All @@ -26,6 +27,11 @@ LD_FLAGS ?= -w -s
GO_FLAGS ?=
DOCS_FLAGS ?=

# Protobuf generation
PROTOC ?= protoc
PROTO_SRC_DIR := internal/impl/grpc/proto
PROTO_GO_OUT_DIR := internal/impl/grpc/pb/proto

APPS = bento
all: $(APPS)

Expand Down Expand Up @@ -116,6 +122,23 @@ docs: $(APPS) $(TOOLS)
"$(WEBSITE_DIR)/docs/**/*.md"
@$(PATHINSTBIN)/bento template lint "./config/template_examples/*.yaml"

# -----------------------------------------------------------------------------
# Protobufs for gRPC plugin
# -----------------------------------------------------------------------------
install-protoc-plugins:
@echo "Installing protoc plugins..."
@GOBIN=$(GOPATH_BIN) $(GO) install google.golang.org/protobuf/cmd/protoc-gen-go@latest
@GOBIN=$(GOPATH_BIN) $(GO) install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
Comment on lines +130 to +131
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we should pin these deps rather than using the latest tag


proto: install-protoc-plugins
@mkdir -p $(PROTO_GO_OUT_DIR)
@PATH="$(GOPATH_BIN):$$PATH" $(PROTOC) \
-I $(PROTO_SRC_DIR) \
--go_out=$(PROTO_GO_OUT_DIR) --go_opt=paths=source_relative \
--go-grpc_out=$(PROTO_GO_OUT_DIR) --go-grpc_opt=paths=source_relative \
$(PROTO_SRC_DIR)/ingest.proto


# HACK:(gregfurman): Change misc/wasm/wasm_exec.js => lib/wasm/wasm_exec.js when using Go 1.24
playground:
@cp -r "internal/cli/blobl/playground" "$(WEBSITE_DIR)/static/playground"
Expand Down
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ require (
github.com/microsoft/gocosmos v1.1.1
github.com/mitchellh/mapstructure v1.5.0
github.com/nats-io/nats.go v1.32.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nkeys v0.4.11
github.com/nats-io/stan.go v0.10.4
github.com/nsf/jsondiff v0.0.0-20230430225905-43f6cf3098c1
github.com/nsqio/go-nsq v1.1.0
Expand Down Expand Up @@ -171,11 +171,15 @@ require (
github.com/apache/arrow-go/v18 v18.0.0 // indirect
github.com/go-zeromq/goczmq/v4 v4.2.2 // indirect
github.com/hamba/avro/v2 v2.26.0 // indirect
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/hashicorp/raft v1.7.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/jwt/v2 v2.8.0 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f // indirect
modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect
Expand Down Expand Up @@ -220,7 +224,6 @@ require (
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/apache/thrift v0.21.0 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/armon/go-metrics v0.3.4 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.12.17
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
Expand Down
63 changes: 23 additions & 40 deletions go.sum

Large diffs are not rendered by default.

151 changes: 151 additions & 0 deletions internal/impl/grpc/grpc_dialopts.go
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we need this builder pattern - can't we just create a []grpc.DialOption in the constructor?

Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package grpc

import (
"context"
"crypto/tls"
"fmt"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)

type authConfig struct {
bearerToken string
headers map[string]string
}

type keepaliveConfig struct {
time time.Duration
timeout time.Duration
permitWithoutStream bool
}

type clientOpts struct {
authority string
userAgent string
loadBalancingPolicy string
maxSendMsgBytes int
maxRecvMsgBytes int
keepalive keepaliveConfig
auth authConfig
}

func buildDialOptions(_ context.Context, useTLS bool, cfg clientOpts) ([]grpc.DialOption, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we have a ctx in the call signature if we aren't using it?

return newDialOptionsBuilder(useTLS, cfg).
withTransport().
withAuthority().
withUserAgent().
withLBPolicy().
withKeepalive().
withMsgSizeOpts().
withPerRPCCreds().
build()
}

// dialOptionsBuilder constructs grpc.DialOptions using a fluent API for readability.
type dialOptionsBuilder struct {
useTLS bool
cfg clientOpts
opts []grpc.DialOption
callOpts []grpc.CallOption
}

func newDialOptionsBuilder(useTLS bool, cfg clientOpts) *dialOptionsBuilder {
return &dialOptionsBuilder{
useTLS: useTLS,
cfg: cfg,
}
}

func (b *dialOptionsBuilder) withTransport() *dialOptionsBuilder {
if b.useTLS {
// Use default system roots; for advanced CA/mTLS provide a custom config downstream.
tlsCfg := &tls.Config{MinVersion: tls.VersionTLS12}
b.opts = append(b.opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg)))
Comment on lines +66 to +67
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole tls setup is really confusing - seems like we use tlsConf, tlsEnabled, _ := conf.FieldTLSToggled("tls") in the constructor but then here we are just replacing the tlsConf with &tls.Config{MinVersion: tls.VersionTLS12}.

} else {
b.opts = append(b.opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
return b
}

func (b *dialOptionsBuilder) withAuthority() *dialOptionsBuilder {
if b.cfg.authority != "" {
b.opts = append(b.opts, grpc.WithAuthority(b.cfg.authority))
}
return b
}

func (b *dialOptionsBuilder) withUserAgent() *dialOptionsBuilder {
if b.cfg.userAgent != "" {
b.opts = append(b.opts, grpc.WithUserAgent(b.cfg.userAgent))
}
return b
}

func (b *dialOptionsBuilder) withLBPolicy() *dialOptionsBuilder {
if b.cfg.loadBalancingPolicy != "" {
b.opts = append(b.opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, b.cfg.loadBalancingPolicy)))
}
return b
}

func (b *dialOptionsBuilder) withKeepalive() *dialOptionsBuilder {
if b.cfg.keepalive.time > 0 || b.cfg.keepalive.timeout > 0 || b.cfg.keepalive.permitWithoutStream {
b.opts = append(b.opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: b.cfg.keepalive.time,
Timeout: b.cfg.keepalive.timeout,
PermitWithoutStream: b.cfg.keepalive.permitWithoutStream,
}))
}
return b
}

func (b *dialOptionsBuilder) withMsgSizeOpts() *dialOptionsBuilder {
if b.cfg.maxSendMsgBytes > 0 {
b.callOpts = append(b.callOpts, grpc.MaxCallSendMsgSize(b.cfg.maxSendMsgBytes))
}
if b.cfg.maxRecvMsgBytes > 0 {
b.callOpts = append(b.callOpts, grpc.MaxCallRecvMsgSize(b.cfg.maxRecvMsgBytes))
}
return b
}

func (b *dialOptionsBuilder) withPerRPCCreds() *dialOptionsBuilder {
if b.cfg.auth.bearerToken != "" || len(b.cfg.auth.headers) > 0 {
b.opts = append(b.opts, grpc.WithPerRPCCredentials(headerCreds{
token: b.cfg.auth.bearerToken,
headers: b.cfg.auth.headers,
secureOnly: true,
}))
}
return b
}

func (b *dialOptionsBuilder) build() ([]grpc.DialOption, error) {
if len(b.callOpts) > 0 {
b.opts = append(b.opts, grpc.WithDefaultCallOptions(b.callOpts...))
}
return b.opts, nil
}

type headerCreds struct {
token string
headers map[string]string
secureOnly bool
}

func (h headerCreds) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
md := map[string]string{}
if h.token != "" {
md["authorization"] = "Bearer " + h.token
}
for k, v := range h.headers {
md[k] = v
}
return md, nil
}

func (h headerCreds) RequireTransportSecurity() bool { return h.secureOnly }
Loading