diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GCSTest.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GCSTest.java index 8103a5bf4..4d3f400ff 100644 --- a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GCSTest.java +++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GCSTest.java @@ -18,11 +18,13 @@ import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Bucket; import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; import com.google.gson.Gson; @@ -77,9 +79,12 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import javax.annotation.Nullable; /** * Tests reading from GCS (Google Cloud Storage) and writing to GCS from within a Dataproc cluster. + * + * TODO: PLUGIN-553 for GCSMultiFiles sink plugin property 'format' with macro wouldn't work */ public class GCSTest extends DataprocETLTestBase { @@ -90,6 +95,7 @@ public class GCSTest extends DataprocETLTestBase { private static final String GCS_MOVE_PLUGIN_NAME = "GCSMove"; private static final String GCS_COPY_PLUGIN_NAME = "GCSCopy"; private static final String SINK_PLUGIN_NAME = "GCS"; + private static final String MULTI_SINK_PLUGIN_NAME = "GCSMultiFiles"; private static final String SOURCE_PLUGIN_NAME = "GCSFile"; private static final Schema ALL_DT_SCHEMA = Schema.recordOf( "record", @@ -118,6 +124,8 @@ public class GCSTest extends DataprocETLTestBase { private static Storage storage; private List markedForDeleteBuckets; + private static final String CSV_CONTENT_TYPE = "text/csv"; + private static final String MULTISINK_RUNTIME_ARG = "multisink.%s"; @BeforeClass public static void testClassSetup() throws IOException { @@ -187,6 +195,10 @@ private String createPath(Bucket bucket, String blobName) { return String.format("gs://%s/%s", bucket.getName(), blobName); } + private void insertData(Bucket bucket, String inputPath, String... data) { + bucket.create(inputPath, String.join("\n", Arrays.asList(data)).getBytes(StandardCharsets.UTF_8)); + } + @Test public void testGCSCopy() throws Exception { String prefix = "cdap-gcs-cp-test"; @@ -707,7 +719,7 @@ public void testGcsSourceFormats() throws Exception { String line2 = "2,Terry Perez,tperez1@example.com"; String line3 = "3,Jack Ferguson,jferguson2@example.com"; String inputPath = "input"; - bucket.create(inputPath, String.join("\n", Arrays.asList(line1, line2, line3)).getBytes(StandardCharsets.UTF_8)); + insertData(bucket, inputPath, line1, line2, line3); String suffix = UUID.randomUUID().toString(); /* @@ -761,7 +773,8 @@ public void testGcsSourceFormats() throws Exception { id,first,last,email,address,city,state,zip 1,Marilyn,Hawkins,mhawkins0@ted.com,238 Melvin Way,Palo Alto,CA,94302 */ - ETLStage sink = new ETLStage("sink", createSinkPlugin("csv", createPath(bucket, "output"), schema)); + ETLStage sink = new ETLStage("sink", createSinkPlugin("csv", createPath(bucket, "output"), + schema, CSV_CONTENT_TYPE)); pipelineConfig = ETLBatchConfig.builder().addStage(sink); for (String format : formats) { String path = String.format("%s/%s", createPath(bucket, OUTPUT_BLOB_NAME), format); @@ -776,6 +789,7 @@ public void testGcsSourceFormats() throws Exception { Map lineCounts = new HashMap<>(); List results = getResultBlobsContent(bucket, "output"); + List resultBlobsContentType = getResultBlobsContentType(bucket, "output"); for (String result : results) { for (String line : result.split("\n")) { lineCounts.putIfAbsent(line, 0); @@ -787,6 +801,71 @@ public void testGcsSourceFormats() throws Exception { expected.put(line2, formats.size()); expected.put(line3, formats.size()); Assert.assertEquals(expected, lineCounts); + Assert.assertEquals(CSV_CONTENT_TYPE, resultBlobsContentType.get(0)); + } + + @Test + public void testMultiSinkContentType() throws Exception { + String bucketName = "cask-gcs-multisink-" + UUID.randomUUID().toString(); + Bucket bucket = createBucket(bucketName); + + Schema schema = Schema.recordOf("customer", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("name", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("email", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("departament", Schema.nullableOf(Schema.of(Schema.Type.STRING)))); + + Schema outputSchema = Schema.recordOf("output.schema", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("name", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("email", Schema.nullableOf(Schema.of(Schema.Type.STRING)))); + + String line1 = "1,Marilyn Hawkins,mhawkins0@example.com,DepartmentA"; + String line2 = "2,Terry Perez,tperez1@example.com,DepartmentB"; + String line3 = "3,Jack Ferguson,jferguson2@example.com,DepartmentA"; + String inputPath = "input"; + insertData(bucket, inputPath, line1, line2, line3); + + Map inputSourceConfig = new HashMap<>(); + inputSourceConfig.put("schema", schema.toString()); + inputSourceConfig.put("format", "${sourceFormat}"); + inputSourceConfig.put("referenceName", "source_" + UUID.randomUUID().toString()); + inputSourceConfig.put("project", getProjectId()); + inputSourceConfig.put("path", createPath(bucket, inputPath)); + ETLStage source = new ETLStage("source", + new ETLPlugin(SOURCE_PLUGIN_NAME, + BatchSource.PLUGIN_TYPE, + inputSourceConfig, + GOOGLE_CLOUD_ARTIFACT)); + + ETLBatchConfig.Builder pipelineConfig = ETLBatchConfig.builder().addStage(source); + + String path = createPath(bucket, OUTPUT_BLOB_NAME); + ETLStage sink = new ETLStage("multsink", createMultiSinkPlugin("csv")); + pipelineConfig.addStage(sink).addConnection(source.getName(), sink.getName()); + + AppRequest appRequest = getBatchAppRequestV2(pipelineConfig.build()); + ApplicationId appId = TEST_NAMESPACE.app("GCSMultiSinkContentType"); + ApplicationManager appManager = deployApplication(appId, appRequest); + + String multisink1 = String.format(MULTISINK_RUNTIME_ARG, "DepartmentA"); + String multisink2 = String.format(MULTISINK_RUNTIME_ARG, "DepartmentB"); + Map args = new HashMap<>(); + args.put(multisink1, outputSchema.toString()); + args.put(multisink2, outputSchema.toString()); + args.put("sourceFormat", "csv"); + args.put("multiSinkPath", path); + args.put("multiSinkProjectId", getProjectId()); + args.put("multiSinkSchema", schema.toString()); + args.put("multiSinkSplitField", "departament"); + args.put("contentType", CSV_CONTENT_TYPE); + startWorkFlow(appManager, ProgramRunStatus.COMPLETED, args); + + List multisinkContentType1 = getResultBlobsContentType(bucket, OUTPUT_BLOB_NAME + "/DepartmentA"); + List multisinkContentType2 = getResultBlobsContentType(bucket, OUTPUT_BLOB_NAME + "/DepartmentB"); + Assert.assertEquals(CSV_CONTENT_TYPE, multisinkContentType1.get(0)); + Assert.assertEquals(CSV_CONTENT_TYPE, multisinkContentType2.get(0)); + } private ETLStage createSourceStage(String format, String path, String regex, Schema schema) { @@ -803,14 +882,32 @@ private ETLStage createSourceStage(String format, String path, String regex, Sch } private ETLPlugin createSinkPlugin(String format, String path, Schema schema) { - return new ETLPlugin(SINK_PLUGIN_NAME, BatchSink.PLUGIN_TYPE, - ImmutableMap.of( - "path", path, - "format", format, - "project", getProjectId(), - "referenceName", format, - "schema", schema.toString()), - GOOGLE_CLOUD_ARTIFACT); + return createSinkPlugin(format, path, schema, null); + } + + private ETLPlugin createSinkPlugin(String format, String path, Schema schema,@Nullable String contentType) { + ImmutableMap.Builder propertyBuilder = new ImmutableMap.Builder() + .put("path", path) + .put("format", format) + .put("project", getProjectId()) + .put("referenceName", format) + .put("schema", schema.toString()); + if (!Strings.isNullOrEmpty(contentType)) { + propertyBuilder.put("contentType", contentType); + } + return new ETLPlugin(SINK_PLUGIN_NAME, BatchSink.PLUGIN_TYPE, propertyBuilder.build(), GOOGLE_CLOUD_ARTIFACT); + } + + private ETLPlugin createMultiSinkPlugin(String sinkFormat) { + Map map = new HashMap<>(); + map.put("path", "${multiSinkPath}"); + map.put("format", sinkFormat); + map.put("project", "${multiSinkProjectId}"); + map.put("schema", "${multiSinkSchema}"); + map.put("referenceName", "gcs-multi-input"); + map.put("splitField", "${multiSinkSplitField}"); + map.put("contentType", "${contentType}"); + return new ETLPlugin(MULTI_SINK_PLUGIN_NAME, BatchSink.PLUGIN_TYPE, map, GOOGLE_CLOUD_ARTIFACT); } static class DataTypesRecord { @@ -881,4 +978,19 @@ private static String blobContentToString(Blob blob) { return null; } + /** + * Reads content type of files in path + */ + private List getResultBlobsContentType(Bucket bucket, String path) { + String successFile = path + "/_SUCCESS"; + assertExists(bucket, successFile); + + return StreamSupport.stream(bucket.list().iterateAll().spliterator(), false) + .filter(blob -> blob.getName().startsWith(path + "/") + && !successFile.equals(blob.getName()) && !blob.getName().endsWith("/")) + .map(BlobInfo::getContentType) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + }