Skip to content

Commit 8120258

Browse files
committed
HIVE-29238:upgrade kafka version to fix CVE-2024-31141 and CVE-2021-38153
1 parent e44cf34 commit 8120258

File tree

8 files changed

+52
-22
lines changed

8 files changed

+52
-22
lines changed

itests/qtest-druid/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
<druid.derby.version>10.11.1.1</druid.derby.version>
3737
<druid.guava.version>16.0.1</druid.guava.version>
3838
<druid.guice.version>4.1.0</druid.guice.version>
39-
<kafka.test.version>2.5.0</kafka.test.version>
39+
<kafka.test.version>3.9.1</kafka.test.version>
4040
<druid.guice.version>4.1.0</druid.guice.version>
4141
<slf4j.version>1.7.30</slf4j.version>
4242
</properties>
@@ -226,6 +226,11 @@
226226
<artifactId>kafka-clients</artifactId>
227227
<version>${kafka.test.version}</version>
228228
</dependency>
229+
<dependency>
230+
<groupId>org.apache.kafka</groupId>
231+
<artifactId>kafka-server</artifactId>
232+
<version>${kafka.test.version}</version>
233+
</dependency>
229234
<dependency>
230235
<groupId>org.slf4j</groupId>
231236
<artifactId>slf4j-api</artifactId>

itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.hive.kafka;
2020

2121
import kafka.server.KafkaConfig;
22-
import kafka.server.KafkaServerStartable;
22+
import kafka.server.KafkaServer;
2323

2424
import org.apache.commons.io.FileUtils;
2525
import org.apache.hadoop.service.AbstractService;
@@ -29,6 +29,7 @@
2929
import org.apache.kafka.clients.producer.ProducerRecord;
3030
import org.apache.kafka.common.serialization.ByteArraySerializer;
3131
import org.apache.kafka.common.serialization.StringSerializer;
32+
import org.apache.kafka.common.utils.Time;
3233

3334
import com.google.common.base.Throwables;
3435
import com.google.common.io.Files;
@@ -43,6 +44,7 @@
4344
import java.util.List;
4445
import java.util.Properties;
4546
import java.util.stream.IntStream;
47+
import scala.Option;
4648

4749
/**
4850
* This class has the hooks to start and stop single node kafka cluster.
@@ -54,7 +56,7 @@ public class SingleNodeKafkaCluster extends AbstractService {
5456
private static final String LOCALHOST = "localhost";
5557

5658

57-
private final KafkaServerStartable serverStartable;
59+
private final KafkaServer server;
5860
private final int brokerPort;
5961
private final String kafkaServer;
6062

@@ -94,21 +96,21 @@ public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort, Intege
9496
properties.setProperty("transaction.state.log.min.isr", String.valueOf(1));
9597
properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577");
9698

97-
this.serverStartable = new KafkaServerStartable(KafkaConfig.fromProps(properties));
99+
this.server = new KafkaServer(KafkaConfig.fromProps(properties), Time.SYSTEM, Option.empty(), false);
98100
}
99101

100102

101103
@Override
102104
protected void serviceStart() throws Exception {
103-
serverStartable.startup();
105+
server.startup();
104106
log.info("Kafka Server Started on port {}", brokerPort);
105107

106108
}
107109

108110
@Override
109111
protected void serviceStop() throws Exception {
110112
log.info("Stopping Kafka Server");
111-
serverStartable.shutdown();
113+
server.shutdown();
112114
log.info("Kafka Server Stopped");
113115
}
114116

kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@
2727
import org.apache.kafka.clients.producer.ProducerConfig;
2828
import org.apache.kafka.clients.producer.ProducerRecord;
2929
import org.apache.kafka.clients.producer.RecordMetadata;
30+
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
3031
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
3132
import org.apache.kafka.common.Metric;
3233
import org.apache.kafka.common.MetricName;
3334
import org.apache.kafka.common.PartitionInfo;
3435
import org.apache.kafka.common.TopicPartition;
36+
import org.apache.kafka.common.errors.InterruptException;
3537
import org.apache.kafka.common.errors.ProducerFencedException;
38+
import org.apache.kafka.common.Uuid;
3639
import org.slf4j.Logger;
3740
import org.slf4j.LoggerFactory;
3841

@@ -44,6 +47,7 @@
4447
import java.util.List;
4548
import java.util.Map;
4649
import java.util.Properties;
50+
import java.util.Set;
4751
import java.util.concurrent.Future;
4852

4953
/**
@@ -67,6 +71,11 @@ class HiveKafkaProducer<K, V> implements Producer<K, V> {
6771
kafkaProducer = new KafkaProducer<>(properties);
6872
}
6973

74+
@Override
75+
public Uuid clientInstanceId(Duration timeout) {
76+
throw new UnsupportedOperationException();
77+
}
78+
7079
@Override public void initTransactions() {
7180
kafkaProducer.initTransactions();
7281
}
@@ -138,11 +147,11 @@ synchronized void resumeTransaction(long producerId, short epoch) {
138147

139148
Object transactionManager = getValue(kafkaProducer, "transactionManager");
140149

141-
Object topicPartitionBookkeeper = getValue(transactionManager, "topicPartitionBookkeeper");
150+
Object txnPartitionMap = getValue(transactionManager, "txnPartitionMap");
142151
invoke(transactionManager,
143152
"transitionTo",
144153
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
145-
invoke(topicPartitionBookkeeper, "reset");
154+
invoke(txnPartitionMap, "reset");
146155
Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
147156
setValue(producerIdAndEpoch, "producerId", producerId);
148157
setValue(producerIdAndEpoch, "epoch", epoch);
@@ -181,10 +190,14 @@ short getEpoch() {
181190
*/
182191
private void flushNewPartitions() {
183192
LOG.info("Flushing new partitions");
184-
TransactionalRequestResult result = enqueueNewPartitions();
185-
Object sender = getValue(kafkaProducer, "sender");
186-
invoke(sender, "wakeup");
187-
result.await();
193+
Object transactionManager = getValue(kafkaProducer, "transactionManager");
194+
Set<TopicPartition> newPartitionsInTransaction = (Set<TopicPartition>) getValue(transactionManager, "newPartitionsInTransaction");
195+
if (!newPartitionsInTransaction.isEmpty()) {
196+
TransactionalRequestResult result = enqueueNewPartitions();
197+
Object sender = getValue(kafkaProducer, "sender");
198+
invoke(sender, "wakeup");
199+
result.await();
200+
}
188201
}
189202

190203
private synchronized TransactionalRequestResult enqueueNewPartitions() {

kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
140140
}
141141
} else {
142142
// case seek to beginning of stream
143-
consumer.seekToBeginning(Collections.singleton(topicPartition));
143+
consumer.seekToBeginning(topicPartitionList);
144144
// seekToBeginning is lazy thus need to call position() or poll(0)
145145
this.startOffset = consumer.position(topicPartition);
146146
LOG.info("Consumer at beginning of topic partition [{}], current start offset [{}]",

kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.kafka.clients.consumer.KafkaConsumer;
2626
import org.apache.kafka.clients.producer.ProducerConfig;
2727
import org.apache.kafka.clients.producer.ProducerRecord;
28+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2829
import org.apache.kafka.common.TopicPartition;
2930
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
3031
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -158,7 +159,9 @@
158159
@Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpochAndId() {
159160
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
160161
secondProducer.resumeTransaction(3434L, (short) 12);
161-
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
162+
secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
163+
new TopicPartition("dummy_topic", 0),
164+
new OffsetAndMetadata(0L)), "__dummy_consumer_group");
162165
secondProducer.close(Duration.ZERO);
163166
}
164167

@@ -169,7 +172,9 @@
169172
producer.close(Duration.ZERO);
170173
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
171174
secondProducer.resumeTransaction(pid, (short) 12);
172-
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
175+
secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
176+
new TopicPartition("dummy_topic", 0),
177+
new OffsetAndMetadata(0L)), "__dummy_consumer_group");
173178
secondProducer.close(Duration.ZERO);
174179
}
175180

@@ -180,7 +185,9 @@
180185
producer.close(Duration.ZERO);
181186
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
182187
secondProducer.resumeTransaction(45L, epoch);
183-
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
188+
secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
189+
new TopicPartition("dummy_topic", 0),
190+
new OffsetAndMetadata(0L)), "__dummy_consumer_group");
184191
secondProducer.close(Duration.ZERO);
185192
}
186193
}

kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import kafka.zk.EmbeddedZookeeper;
2727
import org.apache.commons.io.FileUtils;
2828
import org.apache.hadoop.hive.common.IPStackUtils;
29-
import org.apache.kafka.common.network.Mode;
29+
import org.apache.kafka.common.network.ConnectionMode;
3030
import org.apache.kafka.common.utils.Time;
3131
import org.apache.kafka.test.TestSslUtils;
3232
import org.junit.rules.ExternalResource;
@@ -41,6 +41,7 @@
4141
import java.util.Map;
4242
import java.util.Properties;
4343
import java.util.stream.Collectors;
44+
import scala.Option;
4445

4546
/**
4647
* Test Helper Class to start and stop a kafka broker.
@@ -106,7 +107,8 @@ KafkaBrokerResource enableSASL(String principal, String keytab) {
106107
brokerProps.setProperty("listener.name.l2.gssapi.sasl.jaas.config", jaasConfig);
107108
brokerProps.setProperty("listener.name.l3.gssapi.sasl.jaas.config", jaasConfig);
108109
truststoreFile = File.createTempFile("kafka_truststore", "jks");
109-
brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(Mode.SERVER).createNewTrustStore(truststoreFile).build());
110+
brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(ConnectionMode.SERVER)
111+
.createNewTrustStore(truststoreFile).build());
110112
brokerProps.setProperty("delegation.token.master.key", "AnyValueShouldDoHereItDoesntMatter");
111113
}
112114
brokerProps.setProperty("offsets.topic.replication.factor", "1");
@@ -116,9 +118,9 @@ KafkaBrokerResource enableSASL(String principal, String keytab) {
116118
kafkaServer = TestUtils.createServer(config, Time.SYSTEM);
117119
kafkaServer.startup();
118120
kafkaServer.zkClient();
119-
adminZkClient = new AdminZkClient(kafkaServer.zkClient());
121+
adminZkClient = new AdminZkClient(kafkaServer.zkClient(), Option.empty());
120122
LOG.info("Creating kafka TOPIC [{}]", TOPIC);
121-
adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
123+
adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$, false);
122124
}
123125

124126
/**

kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import javax.annotation.Nullable;
4646
import java.nio.charset.Charset;
47+
import java.time.Duration;
4748
import java.util.Arrays;
4849
import java.util.Iterator;
4950
import java.util.List;
@@ -304,7 +305,7 @@ private static void sendData(List<ConsumerRecord<byte[], byte[]>> recordList, @N
304305
@After public void tearDown() {
305306
this.kafkaRecordIterator = null;
306307
if (this.consumer != null) {
307-
this.consumer.close();
308+
this.consumer.close(Duration.ZERO);
308309
}
309310
}
310311

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@
172172
<junit.version>4.13.2</junit.version>
173173
<junit.jupiter.version>5.13.3</junit.jupiter.version>
174174
<junit.vintage.version>5.13.3</junit.vintage.version>
175-
<kafka.version>2.5.0</kafka.version>
175+
<kafka.version>3.9.1</kafka.version>
176176
<kryo.version>5.5.0</kryo.version>
177177
<reflectasm.version>1.11.9</reflectasm.version>
178178
<kudu.version>1.17.0</kudu.version>

0 commit comments

Comments
 (0)