diff --git a/src/main/java/com/pushpendersaini/helloworld/common/PluginConstants.java b/src/main/java/com/pushpendersaini/helloworld/common/PluginConstants.java index deb5ed1..47d9502 100644 --- a/src/main/java/com/pushpendersaini/helloworld/common/PluginConstants.java +++ b/src/main/java/com/pushpendersaini/helloworld/common/PluginConstants.java @@ -12,7 +12,12 @@ public final class PluginConstants { public static final String PROPERTY_CONFIG_KEY_FREQUENCY = "cdap.hello.world.config.frequency"; public static final int PROPERTY_DEFAULT_FREQUENCY = 1; - // Output + + public static final String PROPERTY_NAME_USER_MESSAGE="User Message"; + public static final String PROPERTY_CONFIG_KEY_USER_MESSAGE = "cdap.hello.world.config.message"; + public static final String PROPERTY_CONFIG_DEFAULT_MESSAGE = "Hello world, this is a custom message"; + + public static final String PLUGIN_OUT_VALUE = "Hello World!"; public static final Schema PLUGIN_OUT_SCHEMA = Schema.recordOf("data", Schema.Field.of("message", Schema.of(Schema.Type.STRING))); } diff --git a/src/main/java/com/pushpendersaini/helloworld/source/HelloWorldBatchSourceConfig.java b/src/main/java/com/pushpendersaini/helloworld/source/HelloWorldBatchSourceConfig.java index e5a7cad..298d97b 100644 --- a/src/main/java/com/pushpendersaini/helloworld/source/HelloWorldBatchSourceConfig.java +++ b/src/main/java/com/pushpendersaini/helloworld/source/HelloWorldBatchSourceConfig.java @@ -6,12 +6,21 @@ import io.cdap.cdap.api.plugin.PluginConfig; import io.cdap.cdap.etl.api.FailureCollector; +import javax.annotation.Nullable; + public class HelloWorldBatchSourceConfig extends PluginConfig { @Name(PluginConstants.PROPERTY_NAME_FREQUENCY) @Description("Number of times the plugin says hello world.") public Integer frequency; + + @Name(PluginConstants.PROPERTY_NAME_USER_MESSAGE) + @Description("Custom message") + @Nullable + public String message; + + public void validate(FailureCollector failureCollector) { if (frequency != null && frequency < 1) { failureCollector.addFailure("Property cannot be lower than 1.", "Use a frequency value of equal to or more than 1.").withConfigProperty(PluginConstants.PROPERTY_NAME_FREQUENCY); @@ -21,5 +30,6 @@ public void validate(FailureCollector failureCollector) { public int getFrequency() { return frequency == null ? PluginConstants.PROPERTY_DEFAULT_FREQUENCY : frequency; } + public String getMessage(){ return (message == null || message.isEmpty()) ? PluginConstants.PROPERTY_CONFIG_DEFAULT_MESSAGE: message; } } diff --git a/src/main/java/com/pushpendersaini/helloworld/source/HelloWorldInputFormatProvider.java b/src/main/java/com/pushpendersaini/helloworld/source/HelloWorldInputFormatProvider.java index f7ec938..3156843 100644 --- a/src/main/java/com/pushpendersaini/helloworld/source/HelloWorldInputFormatProvider.java +++ b/src/main/java/com/pushpendersaini/helloworld/source/HelloWorldInputFormatProvider.java @@ -12,7 +12,10 @@ public class HelloWorldInputFormatProvider implements InputFormatProvider { public HelloWorldInputFormatProvider(HelloWorldBatchSourceConfig config) { configMap = new HashMap<>(); + configMap.put(PluginConstants.PROPERTY_CONFIG_KEY_FREQUENCY, Integer.toString(config.getFrequency())); + configMap.put(PluginConstants.PROPERTY_CONFIG_KEY_MESSAGE, config.getMessage()); + } @Override diff --git a/src/main/java/com/pushpendersaini/helloworld/source/HelloWorldRecordReader.java b/src/main/java/com/pushpendersaini/helloworld/source/HelloWorldRecordReader.java index c21dbed..69b1a9d 100644 --- a/src/main/java/com/pushpendersaini/helloworld/source/HelloWorldRecordReader.java +++ b/src/main/java/com/pushpendersaini/helloworld/source/HelloWorldRecordReader.java @@ -13,12 +13,18 @@ public class HelloWorldRecordReader extends RecordReader { private int frequency; private int countProcessed = 0; + private String message; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration conf = taskAttemptContext.getConfiguration(); // Plugin configuration + frequency = conf.getInt(PluginConstants.PROPERTY_CONFIG_KEY_FREQUENCY, 1); + + message = conf.get(PluginConstants.PROPERTY_CONFIG_KEY_MESSAGE,PluginConstants.PROPERTY_CONFIG_DEFAULT_MESSAGE); + + } @Override