From ff731de7afbe1155dd8bfd298b6746bd0e920601 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Fri, 12 Sep 2025 04:15:31 -0400 Subject: [PATCH 01/12] Add service specific dial options in rpc factory --- common/resource/fx.go | 7 +++-- common/rpc/rpc.go | 36 ++++++++++++---------- common/rpc/test/http_test.go | 3 ++ common/rpc/test/rpc_localstore_tls_test.go | 24 +++++++-------- tests/testcore/onebox.go | 1 + 5 files changed, 40 insertions(+), 31 deletions(-) diff --git a/common/resource/fx.go b/common/resource/fx.go index 252f062aa52..8badc36761b 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -320,7 +320,7 @@ func SdkClientFactoryProvider( resolver *membership.GRPCResolver, dc *dynamicconfig.Collection, ) (sdk.ClientFactory, error) { - frontendURL, _, _, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) + frontendURL, _, _, frontendTLSConfig, err := GetFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) if err != nil { return nil, err } @@ -348,7 +348,7 @@ func RPCFactoryProvider( monitor membership.Monitor, dc *dynamicconfig.Collection, ) (common.RPCFactory, error) { - frontendURL, frontendHTTPURL, frontendHTTPPort, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) + frontendURL, frontendHTTPURL, frontendHTTPPort, frontendTLSConfig, err := GetFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) if err != nil { return nil, err } @@ -370,6 +370,7 @@ func RPCFactoryProvider( frontendHTTPPort, frontendTLSConfig, options, + map[primitives.ServiceName][]grpc.DialOption{}, monitor, ) factory.EnableInternodeServerKeepalive = enableServerKeepalive @@ -385,7 +386,7 @@ func FrontendHTTPClientCacheProvider( return cluster.NewFrontendHTTPClientCache(metadata, tlsConfigProvider) } -func getFrontendConnectionDetails( +func GetFrontendConnectionDetails( cfg *config.Config, tlsConfigProvider encryption.TLSConfigProvider, resolver *membership.GRPCResolver, diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 5b119ee49e5..36257c99a85 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -42,10 +42,11 @@ type RPCFactory struct { frontendHTTPPort int frontendTLSConfig *tls.Config - grpcListener func() net.Listener - tlsFactory encryption.TLSConfigProvider - dialOptions []grpc.DialOption - monitor membership.Monitor + grpcListener func() net.Listener + tlsFactory encryption.TLSConfigProvider + dialOptions []grpc.DialOption + serviceDialOptions map[primitives.ServiceName][]grpc.DialOption + monitor membership.Monitor // A OnceValues wrapper for createLocalFrontendHTTPClient. localFrontendClient func() (*common.FrontendHTTPClient, error) interNodeGrpcConnections cache.Cache @@ -68,20 +69,22 @@ func NewFactory( frontendHTTPPort int, frontendTLSConfig *tls.Config, dialOptions []grpc.DialOption, + serviceDialOptions map[primitives.ServiceName][]grpc.DialOption, monitor membership.Monitor, ) *RPCFactory { f := &RPCFactory{ - config: cfg, - serviceName: sName, - logger: logger, - metricsHandler: metricsHandler, - frontendURL: frontendURL, - frontendHTTPURL: frontendHTTPURL, - frontendHTTPPort: frontendHTTPPort, - frontendTLSConfig: frontendTLSConfig, - tlsFactory: tlsProvider, - dialOptions: dialOptions, - monitor: monitor, + config: cfg, + serviceName: sName, + logger: logger, + metricsHandler: metricsHandler, + frontendURL: frontendURL, + frontendHTTPURL: frontendHTTPURL, + frontendHTTPPort: frontendHTTPPort, + frontendTLSConfig: frontendTLSConfig, + tlsFactory: tlsProvider, + dialOptions: dialOptions, + serviceDialOptions: serviceDialOptions, + monitor: monitor, } f.grpcListener = sync.OnceValue(f.createGRPCListener) f.localFrontendClient = sync.OnceValues(f.createLocalFrontendHTTPClient) @@ -237,7 +240,8 @@ func (d *RPCFactory) createInternodeGRPCConnection(hostName string, serviceName return nil } } - c := d.dial(hostName, tlsClientConfig, d.getClientKeepAliveConfig(serviceName)) + additionalDialOptions := append([]grpc.DialOption{}, d.serviceDialOptions[serviceName]...) + c := d.dial(hostName, tlsClientConfig, append(additionalDialOptions, d.getClientKeepAliveConfig(serviceName))...) d.interNodeGrpcConnections.Put(hostName, c) return c } diff --git a/common/rpc/test/http_test.go b/common/rpc/test/http_test.go index f362b0bede7..8101533fce3 100644 --- a/common/rpc/test/http_test.go +++ b/common/rpc/test/http_test.go @@ -41,6 +41,7 @@ func TestCreateLocalFrontendHTTPClient_UsingMembership(t *testing.T) { int(port), nil, // No TLS nil, + nil, monitor, ) @@ -72,6 +73,7 @@ func TestCreateLocalFrontendHTTPClient_UsingFixedHostPort(t *testing.T) { 0, // Port is unused nil, // No TLS nil, + nil, nil, // monitor should not be used ) @@ -104,6 +106,7 @@ func TestCreateLocalFrontendHTTPClient_UsingFixedHostPort_AndTLS(t *testing.T) { 0, // Port is unused tlsConfig, nil, + nil, nil, // monitor should not be used ) diff --git a/common/rpc/test/rpc_localstore_tls_test.go b/common/rpc/test/rpc_localstore_tls_test.go index cc31873c346..a05a8224e4e 100644 --- a/common/rpc/test/rpc_localstore_tls_test.go +++ b/common/rpc/test/rpc_localstore_tls_test.go @@ -110,7 +110,7 @@ func (s *localStoreRPCSuite) SetupSuite() { provider, err := encryption.NewTLSConfigProviderFromConfig(serverCfgInsecure.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - insecureFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil) + insecureFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil, nil) s.NotNil(insecureFactory) s.insecureRPCFactory = i(insecureFactory) @@ -320,26 +320,26 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err := provider.GetFrontendClientConfig() s.NoError(err) - frontendMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + frontendMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(frontendMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - frontendServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil) + frontendServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil, nil) s.NotNil(frontendServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSSystemWorker.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(frontendSystemWorkerMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - frontendMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + frontendMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(frontendMutualTLSRefreshFactory) s.frontendMutualTLSRPCFactory = f(frontendMutualTLSFactory) @@ -356,7 +356,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err = s.dynamicConfigProvider.GetFrontendClientConfig() s.NoError(err) - dynamicServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, s.dynamicConfigProvider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + dynamicServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, s.dynamicConfigProvider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.frontendDynamicTLSFactory = f(dynamicServerTLSFactory) s.internodeDynamicTLSFactory = i(dynamicServerTLSFactory) @@ -366,7 +366,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - frontendRootCAForceTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + frontendRootCAForceTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(frontendServerTLSFactory) s.frontendConfigRootCAForceTLSFactory = f(frontendRootCAForceTLSFactory) @@ -374,7 +374,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - remoteClusterMutualTLSRPCFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + remoteClusterMutualTLSRPCFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(remoteClusterMutualTLSRPCFactory) s.remoteClusterMutualTLSRPCFactory = r(remoteClusterMutualTLSRPCFactory) } @@ -412,28 +412,28 @@ func (s *localStoreRPCSuite) setupInternode() { s.NoError(err) tlsConfig, err := provider.GetFrontendClientConfig() s.NoError(err) - internodeMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + internodeMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(internodeMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - internodeServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + internodeServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(internodeServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreAltMutualTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - internodeMutualAltTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + internodeMutualAltTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(internodeMutualAltTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - internodeMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + internodeMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(internodeMutualTLSRefreshFactory) s.internodeMutualTLSRPCFactory = i(internodeMutualTLSFactory) diff --git a/tests/testcore/onebox.go b/tests/testcore/onebox.go index bba61f135ef..fc608d279ac 100644 --- a/tests/testcore/onebox.go +++ b/tests/testcore/onebox.go @@ -740,6 +740,7 @@ func (c *TemporalImpl) newRPCFactory( int(httpPort), frontendTLSConfig, options, + map[primitives.ServiceName][]grpc.DialOption{}, monitor, ), nil } From c642c004179a1dd63b4fc29d55e8ced71796844e Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Mon, 15 Sep 2025 16:11:11 -0400 Subject: [PATCH 02/12] Add additional options to FrontEnd Service and rename parameters --- common/resource/fx.go | 5 ++-- common/rpc/rpc.go | 54 +++++++++++++++++++++++++------------------ 2 files changed, 33 insertions(+), 26 deletions(-) diff --git a/common/resource/fx.go b/common/resource/fx.go index 8badc36761b..c35b0ee73e6 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -108,7 +108,7 @@ var Module = fx.Options( ) var DefaultOptions = fx.Options( - fx.Provide(RPCFactoryProvider), + fx.Provide(fx.Annotate(RPCFactoryProvider, fx.As(new(common.RPCFactory)))), fx.Provide(ArchivalMetadataProvider), fx.Provide(ArchiverProviderProvider), fx.Provide(ThrottledLoggerProvider), @@ -347,7 +347,7 @@ func RPCFactoryProvider( tracingStatsHandler telemetry.ClientStatsHandler, monitor membership.Monitor, dc *dynamicconfig.Collection, -) (common.RPCFactory, error) { +) (*rpc.RPCFactory, error) { frontendURL, frontendHTTPURL, frontendHTTPPort, frontendTLSConfig, err := GetFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) if err != nil { return nil, err @@ -370,7 +370,6 @@ func RPCFactoryProvider( frontendHTTPPort, frontendTLSConfig, options, - map[primitives.ServiceName][]grpc.DialOption{}, monitor, ) factory.EnableInternodeServerKeepalive = enableServerKeepalive diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 36257c99a85..92ee63e022b 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -42,11 +42,11 @@ type RPCFactory struct { frontendHTTPPort int frontendTLSConfig *tls.Config - grpcListener func() net.Listener - tlsFactory encryption.TLSConfigProvider - dialOptions []grpc.DialOption - serviceDialOptions map[primitives.ServiceName][]grpc.DialOption - monitor membership.Monitor + grpcListener func() net.Listener + tlsFactory encryption.TLSConfigProvider + commonDialOptions []grpc.DialOption + perServiceDialOptions map[primitives.ServiceName][]grpc.DialOption + monitor membership.Monitor // A OnceValues wrapper for createLocalFrontendHTTPClient. localFrontendClient func() (*common.FrontendHTTPClient, error) interNodeGrpcConnections cache.Cache @@ -68,23 +68,22 @@ func NewFactory( frontendHTTPURL string, frontendHTTPPort int, frontendTLSConfig *tls.Config, - dialOptions []grpc.DialOption, - serviceDialOptions map[primitives.ServiceName][]grpc.DialOption, + commonDialOptions []grpc.DialOption, monitor membership.Monitor, ) *RPCFactory { f := &RPCFactory{ - config: cfg, - serviceName: sName, - logger: logger, - metricsHandler: metricsHandler, - frontendURL: frontendURL, - frontendHTTPURL: frontendHTTPURL, - frontendHTTPPort: frontendHTTPPort, - frontendTLSConfig: frontendTLSConfig, - tlsFactory: tlsProvider, - dialOptions: dialOptions, - serviceDialOptions: serviceDialOptions, - monitor: monitor, + config: cfg, + serviceName: sName, + logger: logger, + metricsHandler: metricsHandler, + frontendURL: frontendURL, + frontendHTTPURL: frontendHTTPURL, + frontendHTTPPort: frontendHTTPPort, + frontendTLSConfig: frontendTLSConfig, + tlsFactory: tlsProvider, + commonDialOptions: commonDialOptions, + perServiceDialOptions: map[primitives.ServiceName][]grpc.DialOption{}, + monitor: monitor, } f.grpcListener = sync.OnceValue(f.createGRPCListener) f.localFrontendClient = sync.OnceValues(f.createLocalFrontendHTTPClient) @@ -92,6 +91,12 @@ func NewFactory( return f } +// SetPerServiceDialOptions sets the per-service dial options. +// This should be called when decorating the RPCFactory before connections are created. +func (d *RPCFactory) SetPerServiceDialOptions(perServiceDialOptions map[primitives.ServiceName][]grpc.DialOption) { + d.perServiceDialOptions = perServiceDialOptions +} + func (d *RPCFactory) GetFrontendGRPCServerOptions() ([]grpc.ServerOption, error) { var opts []grpc.ServerOption @@ -217,13 +222,16 @@ func (d *RPCFactory) CreateRemoteFrontendGRPCConnection(rpcAddress string) *grpc } } keepAliveOption := d.getClientKeepAliveConfig(primitives.FrontendService) + additionalDialOptions := append([]grpc.DialOption{}, d.perServiceDialOptions[primitives.FrontendService]...) - return d.dial(rpcAddress, tlsClientConfig, keepAliveOption) + return d.dial(rpcAddress, tlsClientConfig, append(additionalDialOptions, keepAliveOption)...) } // CreateLocalFrontendGRPCConnection creates connection for internal frontend calls func (d *RPCFactory) CreateLocalFrontendGRPCConnection() *grpc.ClientConn { - return d.dial(d.frontendURL, d.frontendTLSConfig) + additionalDialOptions := append([]grpc.DialOption{}, d.perServiceDialOptions[primitives.InternalFrontendService]...) + + return d.dial(d.frontendURL, d.frontendTLSConfig, additionalDialOptions...) } // createInternodeGRPCConnection creates connection for gRPC calls @@ -240,7 +248,7 @@ func (d *RPCFactory) createInternodeGRPCConnection(hostName string, serviceName return nil } } - additionalDialOptions := append([]grpc.DialOption{}, d.serviceDialOptions[serviceName]...) + additionalDialOptions := append([]grpc.DialOption{}, d.perServiceDialOptions[serviceName]...) c := d.dial(hostName, tlsClientConfig, append(additionalDialOptions, d.getClientKeepAliveConfig(serviceName))...) d.interNodeGrpcConnections.Put(hostName, c) return c @@ -255,7 +263,7 @@ func (d *RPCFactory) CreateMatchingGRPCConnection(rpcAddress string) *grpc.Clien } func (d *RPCFactory) dial(hostName string, tlsClientConfig *tls.Config, dialOptions ...grpc.DialOption) *grpc.ClientConn { - dialOptions = append(d.dialOptions, dialOptions...) + dialOptions = append(d.commonDialOptions, dialOptions...) connection, err := Dial(hostName, tlsClientConfig, d.logger, d.metricsHandler, dialOptions...) if err != nil { d.logger.Fatal("Failed to create gRPC connection", tag.Error(err)) From 7f654324b9934df489b5621cf709ba9e7198cac8 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Mon, 15 Sep 2025 16:25:53 -0400 Subject: [PATCH 03/12] Fix unit tests --- common/rpc/test/http_test.go | 3 --- common/rpc/test/rpc_localstore_tls_test.go | 24 +++++++++++----------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/common/rpc/test/http_test.go b/common/rpc/test/http_test.go index 8101533fce3..f362b0bede7 100644 --- a/common/rpc/test/http_test.go +++ b/common/rpc/test/http_test.go @@ -41,7 +41,6 @@ func TestCreateLocalFrontendHTTPClient_UsingMembership(t *testing.T) { int(port), nil, // No TLS nil, - nil, monitor, ) @@ -73,7 +72,6 @@ func TestCreateLocalFrontendHTTPClient_UsingFixedHostPort(t *testing.T) { 0, // Port is unused nil, // No TLS nil, - nil, nil, // monitor should not be used ) @@ -106,7 +104,6 @@ func TestCreateLocalFrontendHTTPClient_UsingFixedHostPort_AndTLS(t *testing.T) { 0, // Port is unused tlsConfig, nil, - nil, nil, // monitor should not be used ) diff --git a/common/rpc/test/rpc_localstore_tls_test.go b/common/rpc/test/rpc_localstore_tls_test.go index a05a8224e4e..cc31873c346 100644 --- a/common/rpc/test/rpc_localstore_tls_test.go +++ b/common/rpc/test/rpc_localstore_tls_test.go @@ -110,7 +110,7 @@ func (s *localStoreRPCSuite) SetupSuite() { provider, err := encryption.NewTLSConfigProviderFromConfig(serverCfgInsecure.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - insecureFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil, nil) + insecureFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil) s.NotNil(insecureFactory) s.insecureRPCFactory = i(insecureFactory) @@ -320,26 +320,26 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err := provider.GetFrontendClientConfig() s.NoError(err) - frontendMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) + frontendMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) s.NotNil(frontendMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - frontendServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil, nil) + frontendServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil) s.NotNil(frontendServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSSystemWorker.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) + frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) s.NotNil(frontendSystemWorkerMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - frontendMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) + frontendMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) s.NotNil(frontendMutualTLSRefreshFactory) s.frontendMutualTLSRPCFactory = f(frontendMutualTLSFactory) @@ -356,7 +356,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err = s.dynamicConfigProvider.GetFrontendClientConfig() s.NoError(err) - dynamicServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, s.dynamicConfigProvider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) + dynamicServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, s.dynamicConfigProvider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) s.frontendDynamicTLSFactory = f(dynamicServerTLSFactory) s.internodeDynamicTLSFactory = i(dynamicServerTLSFactory) @@ -366,7 +366,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - frontendRootCAForceTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) + frontendRootCAForceTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) s.NotNil(frontendServerTLSFactory) s.frontendConfigRootCAForceTLSFactory = f(frontendRootCAForceTLSFactory) @@ -374,7 +374,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - remoteClusterMutualTLSRPCFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) + remoteClusterMutualTLSRPCFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) s.NotNil(remoteClusterMutualTLSRPCFactory) s.remoteClusterMutualTLSRPCFactory = r(remoteClusterMutualTLSRPCFactory) } @@ -412,28 +412,28 @@ func (s *localStoreRPCSuite) setupInternode() { s.NoError(err) tlsConfig, err := provider.GetFrontendClientConfig() s.NoError(err) - internodeMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) + internodeMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) s.NotNil(internodeMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - internodeServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) + internodeServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) s.NotNil(internodeServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreAltMutualTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - internodeMutualAltTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) + internodeMutualAltTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) s.NotNil(internodeMutualAltTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - internodeMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) + internodeMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) s.NotNil(internodeMutualTLSRefreshFactory) s.internodeMutualTLSRPCFactory = i(internodeMutualTLSFactory) From 2253e52b0c82cae7bc65beaaa8f1f5a5578e06d4 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Mon, 15 Sep 2025 17:01:02 -0400 Subject: [PATCH 04/12] Fix unit tests --- tests/testcore/onebox.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/testcore/onebox.go b/tests/testcore/onebox.go index fc608d279ac..bba61f135ef 100644 --- a/tests/testcore/onebox.go +++ b/tests/testcore/onebox.go @@ -740,7 +740,6 @@ func (c *TemporalImpl) newRPCFactory( int(httpPort), frontendTLSConfig, options, - map[primitives.ServiceName][]grpc.DialOption{}, monitor, ), nil } From d94e3ece0e0013bf3b6e59cc38eede263202a45d Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Mon, 15 Sep 2025 18:59:54 -0400 Subject: [PATCH 05/12] Use shared factory provider with perServiceDialOptions --- common/resource/fx.go | 11 ++++++++-- common/rpc/rpc.go | 7 +------ common/rpc/test/http_test.go | 4 ++++ common/rpc/test/rpc_localstore_tls_test.go | 24 +++++++++++----------- tests/testcore/onebox.go | 1 + 5 files changed, 27 insertions(+), 20 deletions(-) diff --git a/common/resource/fx.go b/common/resource/fx.go index c35b0ee73e6..6469a6cf9d9 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -108,7 +108,8 @@ var Module = fx.Options( ) var DefaultOptions = fx.Options( - fx.Provide(fx.Annotate(RPCFactoryProvider, fx.As(new(common.RPCFactory)))), + fx.Provide(RPCFactoryProvider), + fx.Provide(PerServiceDialOptionsProvider), fx.Provide(ArchivalMetadataProvider), fx.Provide(ArchiverProviderProvider), fx.Provide(ThrottledLoggerProvider), @@ -337,6 +338,10 @@ func DCRedirectionPolicyProvider(cfg *config.Config) config.DCRedirectionPolicy return cfg.DCRedirectionPolicy } +func PerServiceDialOptionsProvider() map[primitives.ServiceName][]grpc.DialOption { + return map[primitives.ServiceName][]grpc.DialOption{} +} + func RPCFactoryProvider( cfg *config.Config, svcName primitives.ServiceName, @@ -345,9 +350,10 @@ func RPCFactoryProvider( tlsConfigProvider encryption.TLSConfigProvider, resolver *membership.GRPCResolver, tracingStatsHandler telemetry.ClientStatsHandler, + perServiceDialOptions map[primitives.ServiceName][]grpc.DialOption, monitor membership.Monitor, dc *dynamicconfig.Collection, -) (*rpc.RPCFactory, error) { +) (common.RPCFactory, error) { frontendURL, frontendHTTPURL, frontendHTTPPort, frontendTLSConfig, err := GetFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) if err != nil { return nil, err @@ -370,6 +376,7 @@ func RPCFactoryProvider( frontendHTTPPort, frontendTLSConfig, options, + perServiceDialOptions, monitor, ) factory.EnableInternodeServerKeepalive = enableServerKeepalive diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 92ee63e022b..41c5659d8ec 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -69,6 +69,7 @@ func NewFactory( frontendHTTPPort int, frontendTLSConfig *tls.Config, commonDialOptions []grpc.DialOption, + perServiceDialOptions map[primitives.ServiceName][]grpc.DialOption, monitor membership.Monitor, ) *RPCFactory { f := &RPCFactory{ @@ -91,12 +92,6 @@ func NewFactory( return f } -// SetPerServiceDialOptions sets the per-service dial options. -// This should be called when decorating the RPCFactory before connections are created. -func (d *RPCFactory) SetPerServiceDialOptions(perServiceDialOptions map[primitives.ServiceName][]grpc.DialOption) { - d.perServiceDialOptions = perServiceDialOptions -} - func (d *RPCFactory) GetFrontendGRPCServerOptions() ([]grpc.ServerOption, error) { var opts []grpc.ServerOption diff --git a/common/rpc/test/http_test.go b/common/rpc/test/http_test.go index f362b0bede7..aa159b0b638 100644 --- a/common/rpc/test/http_test.go +++ b/common/rpc/test/http_test.go @@ -12,6 +12,7 @@ import ( "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/rpc" "go.uber.org/mock/gomock" + "google.golang.org/grpc" ) func TestCreateLocalFrontendHTTPClient_UsingMembership(t *testing.T) { @@ -41,6 +42,7 @@ func TestCreateLocalFrontendHTTPClient_UsingMembership(t *testing.T) { int(port), nil, // No TLS nil, + map[primitives.ServiceName][]grpc.DialOption{}, monitor, ) @@ -72,6 +74,7 @@ func TestCreateLocalFrontendHTTPClient_UsingFixedHostPort(t *testing.T) { 0, // Port is unused nil, // No TLS nil, + map[primitives.ServiceName][]grpc.DialOption{}, nil, // monitor should not be used ) @@ -104,6 +107,7 @@ func TestCreateLocalFrontendHTTPClient_UsingFixedHostPort_AndTLS(t *testing.T) { 0, // Port is unused tlsConfig, nil, + map[primitives.ServiceName][]grpc.DialOption{}, nil, // monitor should not be used ) diff --git a/common/rpc/test/rpc_localstore_tls_test.go b/common/rpc/test/rpc_localstore_tls_test.go index cc31873c346..a05a8224e4e 100644 --- a/common/rpc/test/rpc_localstore_tls_test.go +++ b/common/rpc/test/rpc_localstore_tls_test.go @@ -110,7 +110,7 @@ func (s *localStoreRPCSuite) SetupSuite() { provider, err := encryption.NewTLSConfigProviderFromConfig(serverCfgInsecure.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - insecureFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil) + insecureFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil, nil) s.NotNil(insecureFactory) s.insecureRPCFactory = i(insecureFactory) @@ -320,26 +320,26 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err := provider.GetFrontendClientConfig() s.NoError(err) - frontendMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + frontendMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(frontendMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - frontendServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil) + frontendServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil, nil) s.NotNil(frontendServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSSystemWorker.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(frontendSystemWorkerMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - frontendMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + frontendMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(frontendMutualTLSRefreshFactory) s.frontendMutualTLSRPCFactory = f(frontendMutualTLSFactory) @@ -356,7 +356,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err = s.dynamicConfigProvider.GetFrontendClientConfig() s.NoError(err) - dynamicServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, s.dynamicConfigProvider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + dynamicServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, s.dynamicConfigProvider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.frontendDynamicTLSFactory = f(dynamicServerTLSFactory) s.internodeDynamicTLSFactory = i(dynamicServerTLSFactory) @@ -366,7 +366,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - frontendRootCAForceTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + frontendRootCAForceTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(frontendServerTLSFactory) s.frontendConfigRootCAForceTLSFactory = f(frontendRootCAForceTLSFactory) @@ -374,7 +374,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - remoteClusterMutualTLSRPCFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + remoteClusterMutualTLSRPCFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(remoteClusterMutualTLSRPCFactory) s.remoteClusterMutualTLSRPCFactory = r(remoteClusterMutualTLSRPCFactory) } @@ -412,28 +412,28 @@ func (s *localStoreRPCSuite) setupInternode() { s.NoError(err) tlsConfig, err := provider.GetFrontendClientConfig() s.NoError(err) - internodeMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + internodeMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(internodeMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - internodeServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + internodeServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(internodeServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreAltMutualTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - internodeMutualAltTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + internodeMutualAltTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(internodeMutualAltTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - internodeMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + internodeMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(internodeMutualTLSRefreshFactory) s.internodeMutualTLSRPCFactory = i(internodeMutualTLSFactory) diff --git a/tests/testcore/onebox.go b/tests/testcore/onebox.go index bba61f135ef..fc608d279ac 100644 --- a/tests/testcore/onebox.go +++ b/tests/testcore/onebox.go @@ -740,6 +740,7 @@ func (c *TemporalImpl) newRPCFactory( int(httpPort), frontendTLSConfig, options, + map[primitives.ServiceName][]grpc.DialOption{}, monitor, ), nil } From da6a0271dd10d6d101c710de4eb0b314f223eedb Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Tue, 16 Sep 2025 14:06:53 -0400 Subject: [PATCH 06/12] Correct getFrontendConnectionDetails method visibility --- common/resource/fx.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/resource/fx.go b/common/resource/fx.go index 6469a6cf9d9..00e9baaac9f 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -321,7 +321,7 @@ func SdkClientFactoryProvider( resolver *membership.GRPCResolver, dc *dynamicconfig.Collection, ) (sdk.ClientFactory, error) { - frontendURL, _, _, frontendTLSConfig, err := GetFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) + frontendURL, _, _, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) if err != nil { return nil, err } @@ -354,7 +354,7 @@ func RPCFactoryProvider( monitor membership.Monitor, dc *dynamicconfig.Collection, ) (common.RPCFactory, error) { - frontendURL, frontendHTTPURL, frontendHTTPPort, frontendTLSConfig, err := GetFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) + frontendURL, frontendHTTPURL, frontendHTTPPort, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) if err != nil { return nil, err } @@ -392,7 +392,7 @@ func FrontendHTTPClientCacheProvider( return cluster.NewFrontendHTTPClientCache(metadata, tlsConfigProvider) } -func GetFrontendConnectionDetails( +func getFrontendConnectionDetails( cfg *config.Config, tlsConfigProvider encryption.TLSConfigProvider, resolver *membership.GRPCResolver, From ddd6ce81f93a0826214aa67390bd61287f740678 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Tue, 16 Sep 2025 18:38:13 -0400 Subject: [PATCH 07/12] Add debug line --- common/rpc/rpc.go | 1 + 1 file changed, 1 insertion(+) diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 41c5659d8ec..2cf1825c374 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -244,6 +244,7 @@ func (d *RPCFactory) createInternodeGRPCConnection(hostName string, serviceName } } additionalDialOptions := append([]grpc.DialOption{}, d.perServiceDialOptions[serviceName]...) + fmt.Println("CreateInternodeGRPCConnection Alan Wu", hostName, serviceName, additionalDialOptions) c := d.dial(hostName, tlsClientConfig, append(additionalDialOptions, d.getClientKeepAliveConfig(serviceName))...) d.interNodeGrpcConnections.Put(hostName, c) return c From bc97078c9cbf8e3d7b2fe63d7851656330d597ac Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Tue, 16 Sep 2025 19:04:30 -0400 Subject: [PATCH 08/12] Remove fmt print line --- common/rpc/rpc.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 2cf1825c374..6a4ef352054 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -244,7 +244,8 @@ func (d *RPCFactory) createInternodeGRPCConnection(hostName string, serviceName } } additionalDialOptions := append([]grpc.DialOption{}, d.perServiceDialOptions[serviceName]...) - fmt.Println("CreateInternodeGRPCConnection Alan Wu", hostName, serviceName, additionalDialOptions) + d.logger.Info("CreateInternodeGRPCConnection Alan Wu", tag.Address(hostName), tag.Service(serviceName), + tag.DetailInfo(fmt.Sprintf("additional dial options: %v", additionalDialOptions))) c := d.dial(hostName, tlsClientConfig, append(additionalDialOptions, d.getClientKeepAliveConfig(serviceName))...) d.interNodeGrpcConnections.Put(hostName, c) return c From 495f62d2540835bec94edfb1d1e6c40fab5a4679 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Tue, 16 Sep 2025 21:18:50 -0400 Subject: [PATCH 09/12] Use perServiceDialOptions in provider --- common/rpc/rpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 6a4ef352054..ad8e02e3d4a 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -83,7 +83,7 @@ func NewFactory( frontendTLSConfig: frontendTLSConfig, tlsFactory: tlsProvider, commonDialOptions: commonDialOptions, - perServiceDialOptions: map[primitives.ServiceName][]grpc.DialOption{}, + perServiceDialOptions: perServiceDialOptions, monitor: monitor, } f.grpcListener = sync.OnceValue(f.createGRPCListener) From 3fe3e15632bf85828bcc80bde652e0d4a619841a Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Wed, 17 Sep 2025 13:31:18 -0400 Subject: [PATCH 10/12] Revert "Remove fmt print line" This reverts commit bc97078c9cbf8e3d7b2fe63d7851656330d597ac. --- common/rpc/rpc.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index ad8e02e3d4a..4314c696ba8 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -244,8 +244,7 @@ func (d *RPCFactory) createInternodeGRPCConnection(hostName string, serviceName } } additionalDialOptions := append([]grpc.DialOption{}, d.perServiceDialOptions[serviceName]...) - d.logger.Info("CreateInternodeGRPCConnection Alan Wu", tag.Address(hostName), tag.Service(serviceName), - tag.DetailInfo(fmt.Sprintf("additional dial options: %v", additionalDialOptions))) + fmt.Println("CreateInternodeGRPCConnection Alan Wu", hostName, serviceName, additionalDialOptions) c := d.dial(hostName, tlsClientConfig, append(additionalDialOptions, d.getClientKeepAliveConfig(serviceName))...) d.interNodeGrpcConnections.Put(hostName, c) return c From a552b18f0e0baf6b5134c083edec46dd2a97479a Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Wed, 17 Sep 2025 13:31:28 -0400 Subject: [PATCH 11/12] Revert "Add debug line" This reverts commit ddd6ce81f93a0826214aa67390bd61287f740678. --- common/rpc/rpc.go | 1 - 1 file changed, 1 deletion(-) diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 4314c696ba8..663f5464781 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -244,7 +244,6 @@ func (d *RPCFactory) createInternodeGRPCConnection(hostName string, serviceName } } additionalDialOptions := append([]grpc.DialOption{}, d.perServiceDialOptions[serviceName]...) - fmt.Println("CreateInternodeGRPCConnection Alan Wu", hostName, serviceName, additionalDialOptions) c := d.dial(hostName, tlsClientConfig, append(additionalDialOptions, d.getClientKeepAliveConfig(serviceName))...) d.interNodeGrpcConnections.Put(hostName, c) return c From 28a3a75e1cf90705f0f6882cba52105319e95775 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Wed, 17 Sep 2025 17:13:36 -0400 Subject: [PATCH 12/12] Add service specific dial options in rpc factory