Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions cmd/tools/grpc_test_server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package main

import (
"context"
"io"
"log"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"

structpb "google.golang.org/protobuf/types/known/structpb"
)

// Manual registration for bidi service using google.protobuf.Struct
func chatStreamHandler(_ interface{}, stream grpc.ServerStream) error {
for {
in := new(structpb.Struct)
if err := stream.RecvMsg(in); err != nil {
if err == io.EOF {
return nil
}
return err
}
// Enrich response with server metadata and timestamp so client logs are visible
enriched := map[string]interface{}{}
for k, v := range in.AsMap() {
enriched[k] = v
}
enriched["server"] = "grpc_test_server"
enriched["ts"] = time.Now().Format(time.RFC3339Nano)
enriched["note"] = "bidi echo"
out, _ := structpb.NewStruct(enriched)
if err := stream.SendMsg(out); err != nil {
return err
}
}
}

var chatServiceDesc = grpc.ServiceDesc{
ServiceName: "chat.Chat",
HandlerType: (*interface{})(nil),
Streams: []grpc.StreamDesc{{
StreamName: "Stream",
Handler: chatStreamHandler,
ServerStreams: true,
ClientStreams: true,
}},
}

// Server-stream-only echo service using Struct
func echoStreamHandler(_ interface{}, stream grpc.ServerStream) error {
in := new(structpb.Struct)
if err := stream.RecvMsg(in); err != nil {
if err == io.EOF {
return nil
}
return err
}
// emit a few echoes and finish
for i := 0; i < 3; i++ {
if err := stream.SendMsg(in); err != nil {
return err
}
time.Sleep(200 * time.Millisecond)
}
return nil
}

var echoServiceDesc = grpc.ServiceDesc{
ServiceName: "echo.Echo",
HandlerType: (*interface{})(nil),
Streams: []grpc.StreamDesc{{
StreamName: "Stream",
Handler: echoStreamHandler,
ServerStreams: true,
ClientStreams: false,
}},
}

// Client-stream-only ingest service: counts messages and returns a final result
func ingestStreamHandler(_ interface{}, stream grpc.ServerStream) error {
count := 0
for {
in := new(structpb.Struct)
if err := stream.RecvMsg(in); err != nil {
if err == io.EOF {
break
}
return err
}
count++
}
// respond once with count
out, _ := structpb.NewStruct(map[string]interface{}{"count": count})
return stream.SendMsg(out)
}

var ingestServiceDesc = grpc.ServiceDesc{
ServiceName: "ingest.Ingest",
HandlerType: (*interface{})(nil),
Streams: []grpc.StreamDesc{{
StreamName: "Stream",
Handler: ingestStreamHandler,
ServerStreams: false,
ClientStreams: true,
}},
}

func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("listen: %v", err)
}
s := grpc.NewServer()

// Health service
hs := health.NewServer()
// Set default service to SERVING
hs.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)

healthpb.RegisterHealthServer(s, hs)

// Reflection for dynamic clients
reflection.Register(s)

// Register manual chat service
s.RegisterService(&chatServiceDesc, nil)
// Register echo server-stream-only service
s.RegisterService(&echoServiceDesc, nil)
// Register ingest client-stream-only service
s.RegisterService(&ingestServiceDesc, nil)

log.Println("grpc test server listening on :50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("serve: %v", err)
}
}

// Ensure unused import of context isn't optimized out in newer toolchains
var _ = context.Background
5 changes: 5 additions & 0 deletions cmd/tools/grpc_test_server/pb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Generate Go bindings (requires protoc and protoc-gen-go, protoc-gen-go-grpc):

protoc --go_out=. --go-grpc_out=. chat.proto


10 changes: 10 additions & 0 deletions cmd/tools/grpc_test_server/pb/chat.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
syntax = "proto3";
package chat;

import "google/protobuf/struct.proto";

service Chat {
rpc Stream(stream google.protobuf.Struct) returns (stream google.protobuf.Struct);
}


8 changes: 8 additions & 0 deletions cmd/tools/grpc_test_server/pb/echo.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";
package echo;

import "google/protobuf/struct.proto";

service Echo {
rpc Stream(google.protobuf.Struct) returns (stream google.protobuf.Struct);
}
29 changes: 29 additions & 0 deletions cmd/tools/grpc_test_server/pb/google/protobuf/struct.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
syntax = "proto3";
package google.protobuf;

option go_package = "google.golang.org/protobuf/types/known/structpb";

message Struct {
map<string, Value> fields = 1;
}

message Value {
oneof kind {
NullValue null_value = 1;
double number_value = 2;
string string_value = 3;
bool bool_value = 4;
Struct struct_value = 5;
ListValue list_value = 6;
}
}

enum NullValue {
NULL_VALUE = 0;
}

message ListValue {
repeated Value values = 1;
}


25 changes: 25 additions & 0 deletions cmd/tools/grpc_test_server/pb/grpc/health/v1/health.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
syntax = "proto3";

package grpc.health.v1;

// Standard gRPC health checking protocol
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
}

message HealthCheckRequest {
string service = 1;
}

enum HealthCheckResponse_ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}

message HealthCheckResponse {
HealthCheckResponse_ServingStatus status = 1;
}


8 changes: 8 additions & 0 deletions cmd/tools/grpc_test_server/pb/ingest.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";
package ingest;

import "google/protobuf/struct.proto";

service Ingest {
rpc Stream(stream google.protobuf.Struct) returns (google.protobuf.Struct);
}
38 changes: 38 additions & 0 deletions config/examples/grpc_bidi_chat.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
input:
label: ""
generate:
interval: 1s
mapping: |
root = {
"session_id": "demo",
"message": "hello"
}

pipeline:
processors: []

output:
label: ""
grpc_client:
# minimal required fields
address: "127.0.0.1:50051"
method: "/chat.Chat/Stream"
rpc_type: "bidi"
proto_files:
- chat.proto
- google/protobuf/struct.proto
include_paths:
- cmd/tools/grpc_test_server/pb
# optional timeouts
call_timeout: "30s"
connect_timeout: "2s"
# optional retries
retry_max_attempts: 3
retry_initial_backoff: "200ms"
retry_max_backoff: "2s"
retry_backoff_multiplier: 2
# optional auth headers
# bearer_token: "your-token-here"
# auth_headers:
# x-api-key: "demo"

29 changes: 29 additions & 0 deletions config/examples/grpc_client_stream.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
input:
label: ""
generate:
count: 3
interval: 200ms
mapping: |
root = {}

pipeline:
processors: []

output:
label: ""
grpc_client:
address: "127.0.0.1:50051"
method: "/ingest.Ingest/Stream"
rpc_type: "client_stream"
proto_files:
- ingest.proto
- google/protobuf/struct.proto
include_paths:
- cmd/tools/grpc_test_server/pb
call_timeout: "30s"
connect_timeout: "2s"
retry_max_attempts: 2
retry_initial_backoff: "100ms"
retry_max_backoff: "1s"
retry_backoff_multiplier: 2

32 changes: 32 additions & 0 deletions config/examples/grpc_enhanced_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
input:
label: ""
generate:
count: 5
interval: 500ms
mapping: |
root = {
"value": this.count
}

pipeline:
processors: []

output:
label: ""
grpc_client:
address: "127.0.0.1:50051"
method: "/echo.Echo/Stream"
rpc_type: "unary"
# For outputs, provide the body via the input/generate mapping
proto_files:
- echo.proto
- google/protobuf/struct.proto
include_paths:
- cmd/tools/grpc_test_server/pb
call_timeout: "5s"
connect_timeout: "2s"
retry_max_attempts: 3
retry_initial_backoff: "200ms"
retry_max_backoff: "1s"
retry_backoff_multiplier: 2

35 changes: 35 additions & 0 deletions config/examples/grpc_insecure_example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
input:
generate:
count: 3
interval: 2s
mapping: 'root = { "request_id": uuid_v4(), "timestamp": now() }'

output:
type: grpc_client
grpc_client:
rpc_type: client_stream
address: 127.0.0.1:50051
method: /ingest.Ingest/Stream

# Example using insecure transport for local testing
proto_files: [ "ingest.proto", "google/protobuf/struct.proto" ]
include_paths: [ "cmd/tools/grpc_test_server/pb" ]

# Timeouts
call_timeout: 10s
connect_timeout: 2s

# gRPC best practices
propagate_deadlines: true

# Robust retry policy
retry_max_attempts: 3
retry_initial_backoff: 1s
retry_max_backoff: 10s
retry_backoff_multiplier: 2.0

# Authentication headers (optional)
auth_headers:
client-type: bento-insecure
environment: local
trace-enabled: "true"
Loading
Loading