Skip to content

Commit a393cc4

Browse files
committed
HIVE-29238:upgrade kafka version to fix CVE-2024-31141 and CVE-2021-38153
1 parent e44cf34 commit a393cc4

File tree

5 files changed

+24
-11
lines changed

5 files changed

+24
-11
lines changed

itests/qtest-druid/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
<druid.derby.version>10.11.1.1</druid.derby.version>
3737
<druid.guava.version>16.0.1</druid.guava.version>
3838
<druid.guice.version>4.1.0</druid.guice.version>
39-
<kafka.test.version>2.5.0</kafka.test.version>
39+
<kafka.test.version>3.9.1</kafka.test.version>
4040
<druid.guice.version>4.1.0</druid.guice.version>
4141
<slf4j.version>1.7.30</slf4j.version>
4242
</properties>
@@ -226,6 +226,11 @@
226226
<artifactId>kafka-clients</artifactId>
227227
<version>${kafka.test.version}</version>
228228
</dependency>
229+
<dependency>
230+
<groupId>org.apache.kafka</groupId>
231+
<artifactId>kafka-server</artifactId>
232+
<version>${kafka.test.version}</version>
233+
</dependency>
229234
<dependency>
230235
<groupId>org.slf4j</groupId>
231236
<artifactId>slf4j-api</artifactId>

itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.hive.kafka;
2020

2121
import kafka.server.KafkaConfig;
22-
import kafka.server.KafkaServerStartable;
22+
import kafka.server.KafkaServer;
2323

2424
import org.apache.commons.io.FileUtils;
2525
import org.apache.hadoop.service.AbstractService;
@@ -29,6 +29,7 @@
2929
import org.apache.kafka.clients.producer.ProducerRecord;
3030
import org.apache.kafka.common.serialization.ByteArraySerializer;
3131
import org.apache.kafka.common.serialization.StringSerializer;
32+
import org.apache.kafka.common.utils.Time;
3233

3334
import com.google.common.base.Throwables;
3435
import com.google.common.io.Files;
@@ -43,6 +44,7 @@
4344
import java.util.List;
4445
import java.util.Properties;
4546
import java.util.stream.IntStream;
47+
import scala.Option;
4648

4749
/**
4850
* This class has the hooks to start and stop single node kafka cluster.
@@ -54,7 +56,7 @@ public class SingleNodeKafkaCluster extends AbstractService {
5456
private static final String LOCALHOST = "localhost";
5557

5658

57-
private final KafkaServerStartable serverStartable;
59+
private final KafkaServer server;
5860
private final int brokerPort;
5961
private final String kafkaServer;
6062

@@ -94,21 +96,21 @@ public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort, Intege
9496
properties.setProperty("transaction.state.log.min.isr", String.valueOf(1));
9597
properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577");
9698

97-
this.serverStartable = new KafkaServerStartable(KafkaConfig.fromProps(properties));
99+
this.server = new KafkaServer(KafkaConfig.fromProps(properties),Time.SYSTEM, Option.empty(), false);
98100
}
99101

100102

101103
@Override
102104
protected void serviceStart() throws Exception {
103-
serverStartable.startup();
105+
server.startup();
104106
log.info("Kafka Server Started on port {}", brokerPort);
105107

106108
}
107109

108110
@Override
109111
protected void serviceStop() throws Exception {
110112
log.info("Stopping Kafka Server");
111-
serverStartable.shutdown();
113+
server.shutdown();
112114
log.info("Kafka Server Stopped");
113115
}
114116

kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.kafka.common.PartitionInfo;
3434
import org.apache.kafka.common.TopicPartition;
3535
import org.apache.kafka.common.errors.ProducerFencedException;
36+
import org.apache.kafka.common.Uuid;
3637
import org.slf4j.Logger;
3738
import org.slf4j.LoggerFactory;
3839

@@ -67,6 +68,11 @@ class HiveKafkaProducer<K, V> implements Producer<K, V> {
6768
kafkaProducer = new KafkaProducer<>(properties);
6869
}
6970

71+
@Override
72+
public Uuid clientInstanceId(Duration timeout) {
73+
throw new UnsupportedOperationException();
74+
}
75+
7076
@Override public void initTransactions() {
7177
kafkaProducer.initTransactions();
7278
}

kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import kafka.zk.EmbeddedZookeeper;
2727
import org.apache.commons.io.FileUtils;
2828
import org.apache.hadoop.hive.common.IPStackUtils;
29-
import org.apache.kafka.common.network.Mode;
29+
import org.apache.kafka.common.network.ConnectionMode;
3030
import org.apache.kafka.common.utils.Time;
3131
import org.apache.kafka.test.TestSslUtils;
3232
import org.junit.rules.ExternalResource;
@@ -106,7 +106,7 @@ KafkaBrokerResource enableSASL(String principal, String keytab) {
106106
brokerProps.setProperty("listener.name.l2.gssapi.sasl.jaas.config", jaasConfig);
107107
brokerProps.setProperty("listener.name.l3.gssapi.sasl.jaas.config", jaasConfig);
108108
truststoreFile = File.createTempFile("kafka_truststore", "jks");
109-
brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(Mode.SERVER).createNewTrustStore(truststoreFile).build());
109+
brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(ConnectionMode.SERVER).createNewTrustStore(truststoreFile).build());
110110
brokerProps.setProperty("delegation.token.master.key", "AnyValueShouldDoHereItDoesntMatter");
111111
}
112112
brokerProps.setProperty("offsets.topic.replication.factor", "1");
@@ -116,9 +116,9 @@ KafkaBrokerResource enableSASL(String principal, String keytab) {
116116
kafkaServer = TestUtils.createServer(config, Time.SYSTEM);
117117
kafkaServer.startup();
118118
kafkaServer.zkClient();
119-
adminZkClient = new AdminZkClient(kafkaServer.zkClient());
119+
adminZkClient = new AdminZkClient(kafkaServer.zkClient(),null);
120120
LOG.info("Creating kafka TOPIC [{}]", TOPIC);
121-
adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
121+
adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$,false);
122122
}
123123

124124
/**

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@
172172
<junit.version>4.13.2</junit.version>
173173
<junit.jupiter.version>5.13.3</junit.jupiter.version>
174174
<junit.vintage.version>5.13.3</junit.vintage.version>
175-
<kafka.version>2.5.0</kafka.version>
175+
<kafka.version>3.9.1</kafka.version>
176176
<kryo.version>5.5.0</kryo.version>
177177
<reflectasm.version>1.11.9</reflectasm.version>
178178
<kudu.version>1.17.0</kudu.version>

0 commit comments

Comments
 (0)