consumerRecords = consumer.poll(TimeUnit.SECONDS.toMillis(30));
+ messageIter = consumerRecords.iterator();
if (!messageIter.hasNext()) {
messageIter = null;
return false;
@@ -148,8 +127,8 @@ private boolean processFetchResponse(FetchResponse fetchResponse) {
* Closes this context
*/
public void close() throws IOException {
- if (simpleConsumer != null) {
- simpleConsumer.close();
+ if (consumer != null) {
+ consumer.close();
}
}
}
diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java
index a6d87cb..1f2a43e 100644
--- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java
+++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java
@@ -81,8 +81,8 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
return false;
}
- key.set(request.getTopic(), request.getLeaderId(), request.getPartition(), request.getOffset(),
- request.getOffset(), 0);
+ key.set(request.getTopic(), request.getPartition(), request.getOffset(),
+ request.getOffset());
value = null;
if (reader != null) {
diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRequest.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRequest.java
index c00920e..02b2853 100644
--- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRequest.java
+++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRequest.java
@@ -16,46 +16,37 @@
package co.cask.hydrator.plugin.batchSource;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-import java.net.URI;
-import java.util.HashMap;
+import com.google.common.collect.ImmutableMap;
+
import java.util.Map;
/**
* A class that represents the kafka pull request.
- *
+ *
* The class is a container for topic, leaderId, partition, uri and offset. It is
* used in reading and writing the sequence files used for the extraction job.
- *
*/
public class KafkaRequest {
public static final long DEFAULT_OFFSET = 0;
+ private Map conf;
private String topic = "";
- private String leaderId = "";
private int partition = 0;
- private URI uri = null;
private long offset = DEFAULT_OFFSET;
private long latestOffset = -1;
private long earliestOffset = -2;
private long avgMsgSize = 1024;
- public KafkaRequest(String topic, String leaderId, int partition, URI brokerUri) {
- this(topic, leaderId, partition, brokerUri, DEFAULT_OFFSET, -1);
+ public KafkaRequest(Map conf, String topic, int partition) {
+ this(conf, topic, partition, DEFAULT_OFFSET, -1);
}
- public KafkaRequest(String topic, String leaderId, int partition, URI brokerUri, long offset, long latestOffset) {
+ public KafkaRequest(Map conf, String topic, int partition, long offset, long latestOffset) {
+ this.conf = ImmutableMap.copyOf(conf);
this.topic = topic;
- this.leaderId = leaderId;
- this.uri = brokerUri;
this.partition = partition;
this.latestOffset = latestOffset;
setOffset(offset);
@@ -77,18 +68,14 @@ public void setOffset(long offset) {
this.offset = offset;
}
- public String getLeaderId() {
- return this.leaderId;
+ public Map getConf() {
+ return conf;
}
public String getTopic() {
return this.topic;
}
- public URI getURI() {
- return this.uri;
- }
-
public int getPartition() {
return this.partition;
}
@@ -98,48 +85,11 @@ public long getOffset() {
}
public long getEarliestOffset() {
- if (this.earliestOffset == -2 && uri != null) {
- SimpleConsumer consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 20000, 1024 * 1024, "client");
- Map offsetInfo = new HashMap<>();
- offsetInfo.put(new TopicAndPartition(topic, partition),
- new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
- OffsetResponse response =
- consumer.getOffsetsBefore(new OffsetRequest(offsetInfo, kafka.api.OffsetRequest.CurrentVersion(), "client"));
- long[] endOffset = response.offsets(topic, partition);
- if (endOffset.length == 0) {
- throw new RuntimeException("Could not find earliest offset for topic: " + topic +
- " and partition: " + partition);
- }
- consumer.close();
- this.earliestOffset = endOffset[0];
- return endOffset[0];
- } else {
- return this.earliestOffset;
- }
+ return this.earliestOffset;
}
public long getLastOffset() {
- if (this.latestOffset == -1 && uri != null)
- return getLastOffset(kafka.api.OffsetRequest.LatestTime());
- else {
- return this.latestOffset;
- }
- }
-
- private long getLastOffset(long time) {
- SimpleConsumer consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 60000, 1024 * 1024, "client");
- Map offsetInfo = new HashMap<>();
- offsetInfo.put(new TopicAndPartition(topic, partition), new PartitionOffsetRequestInfo(time, 1));
- OffsetResponse response =
- consumer.getOffsetsBefore(new OffsetRequest(offsetInfo, kafka.api.OffsetRequest.CurrentVersion(), "client"));
- long[] endOffset = response.offsets(topic, partition);
- consumer.close();
- if (endOffset.length == 0) {
- throw new RuntimeException("Could not find latest offset for topic: " + topic +
- " and partition: " + partition);
- }
- this.latestOffset = endOffset[0];
- return endOffset[0];
+ return this.latestOffset;
}
public long estimateDataSize() {
diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaSplit.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaSplit.java
index ce56ad5..b7313ff 100644
--- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaSplit.java
+++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaSplit.java
@@ -17,16 +17,16 @@
package co.cask.hydrator.plugin.batchSource;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
import javax.annotation.Nullable;
/**
@@ -46,34 +46,28 @@ public KafkaSplit(KafkaRequest request) {
@Override
public void readFields(DataInput in) throws IOException {
- String topic = in.readUTF();
- String leaderId = in.readUTF();
- String str = in.readUTF();
- URI uri = null;
- if (!str.isEmpty())
- try {
- uri = new URI(str);
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
- int partition = in.readInt();
- long offset = in.readLong();
- long latestOffset = in.readLong();
- request = new KafkaRequest(topic, leaderId, partition, uri, offset, latestOffset);
- length = request.estimateDataSize();
+ MapWritable mapWritable = new MapWritable();
+ mapWritable.readFields(in);
+ String topic = in.readUTF();
+ int partition = in.readInt();
+ long offset = in.readLong();
+ long latestOffset = in.readLong();
+ long earliestOffset = in.readLong();
+ Map conf = writableToConf(mapWritable);
+ request = new KafkaRequest(conf, topic, partition, offset, latestOffset);
+ request.setEarliestOffset(earliestOffset);
+ length = request.estimateDataSize();
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeUTF(request.getTopic());
- out.writeUTF(request.getLeaderId());
- if (request.getURI() != null)
- out.writeUTF(request.getURI().toString());
- else
- out.writeUTF("");
+ MapWritable conf = confToWritable(request.getConf());
+ conf.write(out);
+ out.writeUTF(request.getTopic());
out.writeInt(request.getPartition());
out.writeLong(request.getOffset());
out.writeLong(request.getLastOffset());
+ out.writeLong(request.getEarliestOffset());
}
@Override
@@ -92,4 +86,20 @@ public KafkaRequest popRequest() {
request = null;
return result;
}
+
+ private MapWritable confToWritable(Map conf) {
+ MapWritable mapWritable = new MapWritable();
+ for (Map.Entry entry : conf.entrySet()) {
+ mapWritable.put(new Text(entry.getKey()), new Text(entry.getValue()));
+ }
+ return mapWritable;
+ }
+
+ private Map writableToConf(MapWritable mapWritable) {
+ Map conf = new HashMap<>();
+ for (Map.Entry entry : mapWritable.entrySet()) {
+ conf.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+ return conf;
+ }
}
diff --git a/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java b/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java
index 253d8c8..238873d 100644
--- a/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java
+++ b/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java
@@ -19,7 +19,6 @@
import co.cask.hydrator.common.ReferencePluginConfig;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import org.apache.avro.reflect.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
@@ -28,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
/**
* Kafka sink to write to Kafka
@@ -144,6 +144,16 @@ public static class Config extends ReferencePluginConfig {
@Nullable
private String kafkaProperties;
+ @Description("The kerberos principal used for the source.")
+ @Macro
+ @Nullable
+ private String principal;
+
+ @Description("The keytab location for the kerberos principal when kerberos security is enabled for kafka.")
+ @Macro
+ @Nullable
+ private String keytabLocation;
+
@Name("compressionType")
@Description("Additional kafka producer properties to set")
@Macro
@@ -171,10 +181,19 @@ private static class KafkaOutputFormatProvider implements OutputFormatProvider {
conf.put(BROKER_LIST, kafkaSinkConfig.brokers);
conf.put("compression.type", kafkaSinkConfig.compressionType);
- conf.put(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
- conf.put(VAL_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
+
addKafkaProperties(kafkaSinkConfig.kafkaProperties);
+ if (kafkaSinkConfig.principal != null && kafkaSinkConfig.keytabLocation != null) {
+ conf.put("additional." + "sasl.jaas.config",
+ String.format("com.sun.security.auth.module.Krb5LoginModule required \n" +
+ " useKeyTab=true \n" +
+ " storeKey=true \n" +
+ " useTicketCache=false \n" +
+ " keyTab=\"%s\" \n" +
+ " principal=\"%s\";", kafkaSinkConfig.keytabLocation,
+ kafkaSinkConfig.principal));
+ }
conf.put("async", kafkaSinkConfig.async);
if (kafkaSinkConfig.async.equalsIgnoreCase("true")) {
diff --git a/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java b/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java
index efc43c7..3f4451e 100644
--- a/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java
+++ b/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java
@@ -10,6 +10,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,8 +77,6 @@ public RecordWriter getRecordWriter(TaskAttemptContext context)
Properties props = new Properties();
// Configure the properties for kafka.
props.put(BROKER_LIST, configuration.get(BROKER_LIST));
- props.put(KEY_SERIALIZER, configuration.get(KEY_SERIALIZER));
- props.put(VAL_SERIALIZER, configuration.get(VAL_SERIALIZER));
props.put("compression.type", configuration.get("compression.type"));
if (!Strings.isNullOrEmpty(configuration.get("hasKey"))) {
@@ -102,7 +101,8 @@ public RecordWriter getRecordWriter(TaskAttemptContext context)
// CDAP-9178: cached the producer object to avoid being created on every batch interval
if (producer == null) {
- producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props, new StringSerializer(),
+ new StringSerializer());
}
return new KafkaRecordWriter(producer, topic);
diff --git a/src/main/java/co/cask/hydrator/plugin/sink/StringPartitioner.java b/src/main/java/co/cask/hydrator/plugin/sink/StringPartitioner.java
index 5cf38eb..6014a18 100644
--- a/src/main/java/co/cask/hydrator/plugin/sink/StringPartitioner.java
+++ b/src/main/java/co/cask/hydrator/plugin/sink/StringPartitioner.java
@@ -1,8 +1,12 @@
package co.cask.hydrator.plugin.sink;
import com.google.common.hash.Hashing;
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+
+import java.util.List;
+import java.util.Map;
/**
* String partitioner for kafka
@@ -10,12 +14,18 @@
@SuppressWarnings("UnusedDeclaration")
public final class StringPartitioner implements Partitioner {
- public StringPartitioner(VerifiableProperties props) {
+ @Override
+ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
+ List partitions = cluster.partitionsForTopic(topic);
+ int numPartitions = partitions.size();
+ return Math.abs(Hashing.md5().hashString(key.toString()).asInt()) % numPartitions;
+ }
+ @Override
+ public void close() {
}
@Override
- public int partition(Object key, int numPartitions) {
- return Math.abs(Hashing.md5().hashString(key.toString()).asInt()) % numPartitions;
+ public void configure(Map map) {
}
}
diff --git a/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java b/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java
new file mode 100644
index 0000000..e03a301
--- /dev/null
+++ b/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java
@@ -0,0 +1,124 @@
+package co.cask.hydrator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.Time;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.twill.internal.utils.Networks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.net.BindException;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link com.google.common.util.concurrent.Service} implementation for running an instance of Kafka server in
+ * the same process.
+ */
+public final class EmbeddedKafkaServer extends AbstractIdleService {
+
+ public static final String START_RETRIES = "twill.kafka.start.timeout.retries";
+
+ private static final Logger LOG = LoggerFactory.getLogger(org.apache.twill.internal.kafka.EmbeddedKafkaServer.class);
+ private static final String DEFAULT_START_RETRIES = "5";
+
+ private final int startTimeoutRetries;
+ private final Properties properties;
+ private KafkaServer server;
+
+ public EmbeddedKafkaServer(Properties properties) {
+ this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_RETRIES,
+ DEFAULT_START_RETRIES));
+ this.properties = new Properties();
+ this.properties.putAll(properties);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ int tries = 0;
+ do {
+ KafkaConfig kafkaConfig = createKafkaConfig(properties);
+ KafkaServer kafkaServer = createKafkaServer(kafkaConfig);
+ try {
+ kafkaServer.startup();
+ server = kafkaServer;
+ } catch (Exception e) {
+ kafkaServer.shutdown();
+ kafkaServer.awaitShutdown();
+
+ Throwable rootCause = Throwables.getRootCause(e);
+ if (rootCause instanceof ZkTimeoutException) {
+ // Potentially caused by race condition bug described in TWILL-139.
+ LOG.warn("Timeout when connecting to ZooKeeper from KafkaServer. Attempt number {}.", tries, rootCause);
+ } else if (rootCause instanceof BindException) {
+ LOG.warn("Kafka failed to bind to port {}. Attempt number {}.", kafkaConfig.port(), tries, rootCause);
+ } else {
+ throw e;
+ }
+
+ // Do a random sleep of < 200ms
+ TimeUnit.MILLISECONDS.sleep(new Random().nextInt(200) + 1L);
+ }
+ } while (server == null && ++tries < startTimeoutRetries);
+
+ if (server == null) {
+ throw new IllegalStateException("Failed to start Kafka server after " + tries + " attempts.");
+ }
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ if (server != null) {
+ server.shutdown();
+ server.awaitShutdown();
+ }
+ }
+
+ private KafkaServer createKafkaServer(KafkaConfig kafkaConfig) {
+ return new KafkaServer(kafkaConfig, new Time() {
+
+ @Override
+ public long milliseconds() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long nanoseconds() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ }
+ }
+ }, Option.apply("embedded-server"));
+ }
+
+ /**
+ * Creates a new {@link KafkaConfig} from the given {@link Properties}. If the {@code "port"} property is missing
+ * or is equals to {@code "0"}, a random port will be generated.
+ */
+ private KafkaConfig createKafkaConfig(Properties properties) {
+ Properties prop = new Properties();
+ prop.putAll(properties);
+
+ String port = prop.getProperty("port");
+ if (port == null || "0".equals(port)) {
+ int randomPort = Networks.getRandomPort();
+ Preconditions.checkState(randomPort > 0, "Failed to get random port.");
+ prop.setProperty("port", Integer.toString(randomPort));
+ }
+
+ return new KafkaConfig(prop);
+ }
+}
diff --git a/src/test/java/co/cask/hydrator/PipelineTest.java b/src/test/java/co/cask/hydrator/PipelineTest.java
index 2409d26..bc42363 100644
--- a/src/test/java/co/cask/hydrator/PipelineTest.java
+++ b/src/test/java/co/cask/hydrator/PipelineTest.java
@@ -44,7 +44,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import kafka.common.TopicAndPartition;
-import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
@@ -90,7 +90,8 @@ public static void setupTestClass() throws Exception {
// this will make our plugins available to data-pipeline.
addPluginArtifact(NamespaceId.DEFAULT.artifact("example-plugins", "1.0.0"),
parentArtifact,
- KafkaBatchSource.class);
+ KafkaBatchSource.class,
+ RangeAssignor.class);
zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
zkServer.startAndWait();
@@ -124,6 +125,7 @@ public void testKafkaSource() throws Exception {
sourceProperties.put("referenceName", "kafkaTest");
sourceProperties.put("tableName", "testKafkaSource");
sourceProperties.put("topic", "users");
+ sourceProperties.put("maxNumberRecords", "3");
sourceProperties.put("schema", schema.toString());
sourceProperties.put("format", "csv");
ETLStage source =
@@ -145,6 +147,7 @@ public void testKafkaSource() throws Exception {
messages.put("a", "1,samuel,jackson");
messages.put("b", "2,dwayne,johnson");
messages.put("c", "3,christopher,walken");
+ messages.put("d", "4,donald,trump");
sendKafkaMessage("users", messages);
@@ -177,19 +180,21 @@ public void testKafkaSource() throws Exception {
Assert.assertEquals(3, Bytes.toLong(offset));
messages = new HashMap<>();
- messages.put("d", "4,samuel,jackson");
- messages.put("e", "5,dwayne,johnson");
+ messages.put("d", "5,samuel,jackson");
+ messages.put("e", "6,dwayne,johnson");
sendKafkaMessage("users", messages);
workflowManager.start();
TimeUnit.SECONDS.sleep(10);
workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 1, TimeUnit.MINUTES);
- final Map expected2 = ImmutableMap.of(
- 1L, "samuel jackson",
- 2L, "dwayne johnson",
- 3L, "christopher walken",
- 4L, "samuel jackson",
- 5L, "dwayne johnson"
- );
+ workflowManager.waitForRuns(ProgramRunStatus.COMPLETED, 2, 1, TimeUnit.MINUTES);
+ final Map expected2 = ImmutableMap.builder()
+ .put(1L, "samuel jackson")
+ .put(2L, "dwayne johnson")
+ .put(3L, "christopher walken")
+ .put(4L, "donald trump")
+ .put(5L, "samuel jackson")
+ .put(6L, "dwayne johnson")
+ .build();
outputRecords = new HashSet<>();
outputRecords.addAll(MockSink.readOutput(outputManager));
@@ -197,7 +202,7 @@ public void testKafkaSource() throws Exception {
for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) {
actual2.put((Long) outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last"));
}
- Assert.assertEquals(5, outputRecords.size());
+ Assert.assertEquals(6, outputRecords.size());
Assert.assertEquals(expected2, actual2);
}
diff --git a/widgets/Kafka-batchsink.json b/widgets/Kafka-batchsink.json
index ee33e85..4e8d141 100644
--- a/widgets/Kafka-batchsink.json
+++ b/widgets/Kafka-batchsink.json
@@ -50,6 +50,16 @@
"default": "none"
}
},
+ {
+ "widget-type": "textbox",
+ "label": "Kerberos Principal",
+ "name": "principal"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Keytab Location",
+ "name": "keytabLocation"
+ },
{
"widget-type": "keyvalue",
"label": "Additional Kafka Producer Properties",
diff --git a/widgets/Kafka-batchsource.json b/widgets/Kafka-batchsource.json
index 9c563e4..254a88b 100644
--- a/widgets/Kafka-batchsource.json
+++ b/widgets/Kafka-batchsource.json
@@ -30,6 +30,11 @@
"label": "Offset Table Name",
"name": "tableName"
},
+ {
+ "widget-type": "textbox",
+ "label": "Consumer Group Id",
+ "name": "consumerGroupId"
+ },
{
"widget-type": "csv",
"label": "Topic Partitions",
@@ -48,6 +53,11 @@
"value-placeholder": "Offset"
}
},
+ {
+ "widget-type": "textbox",
+ "label": "Max Number Records",
+ "name": "maxNumberRecords"
+ },
{
"widget-type": "textbox",
"label": "Key Field",
@@ -62,6 +72,26 @@
"widget-type": "textbox",
"label": "Offset Field",
"name": "offsetField"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Kerberos Principal",
+ "name": "principal"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Keytab Location",
+ "name": "keytabLocation"
+ },
+ {
+ "widget-type": "keyvalue",
+ "label": "Additional Kafka Consumer Properties",
+ "name": "kafkaProperties",
+ "widget-attributes": {
+ "showDelimiter": "false",
+ "key-placeholder": "Kafka consumer property",
+ "value-placeholder": "Kafka consumer property value"
+ }
}
]
},