@@ -4,15 +4,13 @@ use crate::utils::logging::init_test_logger;
44use crate :: utils:: rand:: { rand_test_group, rand_test_topic} ;
55use crate :: utils:: { get_broker_version, KafkaVersion } ;
66use backon:: { BlockingRetryable , ExponentialBuilder } ;
7- use rdkafka:: admin:: {
8- AdminOptions , AlterConfig , ConfigEntry , ConfigSource , GroupResult , NewPartitions , NewTopic ,
9- OwnedResourceSpecifier , ResourceSpecifier , TopicReplication ,
10- } ;
7+ use rdkafka:: admin:: { AdminClient , AdminOptions , AlterConfig , ConfigEntry , ConfigSource , GroupResult , NewPartitions , NewTopic , OwnedResourceSpecifier , ResourceSpecifier , TopicReplication } ;
118use rdkafka:: error:: KafkaError ;
129use rdkafka:: producer:: { FutureRecord , Producer } ;
13- use rdkafka:: { Offset , TopicPartitionList } ;
10+ use rdkafka:: { ClientConfig , Offset , TopicPartitionList } ;
1411use rdkafka_sys:: RDKafkaErrorCode ;
1512use std:: time:: Duration ;
13+ use rdkafka:: client:: DefaultClientContext ;
1614
1715#[ path = "utils/mod.rs" ]
1816mod utils;
@@ -624,18 +622,48 @@ async fn test_configs() {
624622 assert_eq ! ( res, & [ Ok ( OwnedResourceSpecifier :: Broker ( utils:: BROKER_ID ) ) ] ) ;
625623}
626624
625+ /// Tests whether each admin operation properly reports an error if the entire
626+ /// request fails. The original implementations failed to check this, resulting
627+ /// in confusing situations where a failed admin request would return Ok([]).
627628#[ tokio:: test]
628- async fn test_groups ( ) {
629- init_test_logger ( ) ;
629+ async fn test_event_errors ( ) {
630+ // Configure an admin client to target a Kafka server that doesn't exist,
631+ // then set an impossible timeout. This will ensure that every request fails
632+ // with an OperationTimedOut error, assuming, of course, that the request
633+ // passes client-side validation.
634+ let admin_client = ClientConfig :: new ( )
635+ . set ( "bootstrap.servers" , "noexist" )
636+ . create :: < AdminClient < DefaultClientContext > > ( )
637+ . expect ( "admin client creation failed" ) ;
638+ let opts = AdminOptions :: new ( ) . request_timeout ( Some ( Duration :: from_nanos ( 1 ) ) ) ;
639+
640+ let res = admin_client. create_topics ( & [ ] , & opts) . await ;
641+ assert_eq ! (
642+ res,
643+ Err ( KafkaError :: AdminOp ( RDKafkaErrorCode :: OperationTimedOut ) )
644+ ) ;
630645
631- // Get Kafka container context.
632- let kafka_context = KafkaContext :: shared ( )
633- . await
634- . expect ( "could not create kafka context" ) ;
646+ let res = admin_client. create_partitions ( & [ ] , & opts) . await ;
647+ assert_eq ! (
648+ res,
649+ Err ( KafkaError :: AdminOp ( RDKafkaErrorCode :: OperationTimedOut ) )
650+ ) ;
635651
636- // Create admin client
637- let admin_client = utils:: admin:: create_admin_client ( & kafka_context. bootstrap_servers )
638- . await
639- . expect ( "could not create admin client" ) ;
640- let opts = AdminOptions :: new ( ) ;
641- }
652+ let res = admin_client. delete_topics ( & [ ] , & opts) . await ;
653+ assert_eq ! (
654+ res,
655+ Err ( KafkaError :: AdminOp ( RDKafkaErrorCode :: OperationTimedOut ) )
656+ ) ;
657+
658+ let res = admin_client. describe_configs ( & [ ] , & opts) . await ;
659+ assert_eq ! (
660+ res. err( ) ,
661+ Some ( KafkaError :: AdminOp ( RDKafkaErrorCode :: OperationTimedOut ) )
662+ ) ;
663+
664+ let res = admin_client. alter_configs ( & [ ] , & opts) . await ;
665+ assert_eq ! (
666+ res,
667+ Err ( KafkaError :: AdminOp ( RDKafkaErrorCode :: OperationTimedOut ) )
668+ ) ;
669+ }
0 commit comments