-
Notifications
You must be signed in to change notification settings - Fork 86
Plugin-498: Added content-type to GCS sink #462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,6 +67,7 @@ public class GCSBatchSink extends AbstractFileSink<GCSBatchSink.GCSBatchSinkConf | |
| private static final String RECORDS_UPDATED_METRIC = "records.updated"; | ||
| public static final String AVRO_NAMED_OUTPUT = "avro.mo.config.namedOutput"; | ||
| public static final String COMMON_NAMED_OUTPUT = "mapreduce.output.basename"; | ||
| public static final String CONTENT_TYPE = "io.cdap.gcs.batch.sink.content.type"; | ||
|
|
||
| private final GCSBatchSinkConfig config; | ||
| private String outputPath; | ||
|
|
@@ -125,6 +126,7 @@ public void prepareRun(BatchSinkContext context) throws Exception { | |
| @Override | ||
| protected Map<String, String> getFileSystemProperties(BatchSinkContext context) { | ||
| Map<String, String> properties = GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>()); | ||
| properties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType()); | ||
| properties.putAll(config.getFileSystemProperties()); | ||
| String outputFileBaseName = config.getOutputFileNameBase(); | ||
| if (outputFileBaseName == null || outputFileBaseName.isEmpty()) { | ||
|
|
@@ -242,6 +244,23 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements | |
| private static final String NAME_LOCATION = "location"; | ||
| private static final String NAME_FS_PROPERTIES = "fileSystemProperties"; | ||
| private static final String NAME_FILE_NAME_BASE = "outputFileNameBase"; | ||
| private static final String NAME_CONTENT_TYPE = "contentType"; | ||
| private static final String NAME_CUSTOM_CONTENT_TYPE = "customContentType"; | ||
| private static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; | ||
| private static final String CONTENT_TYPE_OTHER = "other"; | ||
| private static final String CONTENT_TYPE_APPLICATION_JSON = "application/json"; | ||
| private static final String CONTENT_TYPE_APPLICATION_AVRO = "application/avro"; | ||
| private static final String CONTENT_TYPE_APPLICATION_CSV = "application/csv"; | ||
| private static final String CONTENT_TYPE_TEXT_PLAIN = "text/plain"; | ||
| private static final String CONTENT_TYPE_TEXT_CSV = "text/csv"; | ||
| private static final String CONTENT_TYPE_TEXT_TSV = "text/tab-separated-values"; | ||
| private static final String FORMAT_AVRO = "avro"; | ||
| private static final String FORMAT_CSV = "csv"; | ||
| private static final String FORMAT_JSON = "json"; | ||
| private static final String FORMAT_TSV = "tsv"; | ||
| private static final String FORMAT_DELIMITED = "delimited"; | ||
| private static final String FORMAT_ORC = "orc"; | ||
| private static final String FORMAT_PARQUET = "parquet"; | ||
|
|
||
| private static final String SCHEME = "gs://"; | ||
| @Name(NAME_PATH) | ||
|
|
@@ -280,6 +299,18 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements | |
| "This value is ignored if the bucket already exists") | ||
| protected String location; | ||
|
|
||
| @Macro | ||
| @Description("The Content Type property is used to indicate the media type of the resource." + | ||
| "Defaults to 'application/octet-stream'.") | ||
| @Nullable | ||
| protected String contentType; | ||
flakrimjusufi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| @Macro | ||
| @Description("The Custom Content Type is used when the value of Content-Type is set to other." + | ||
| "User can provide specific Content-Type, different from the options in the dropdown.") | ||
| @Nullable | ||
| protected String customContentType; | ||
flakrimjusufi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| @Name(NAME_FS_PROPERTIES) | ||
| @Macro | ||
| @Nullable | ||
|
|
@@ -326,10 +357,19 @@ public void validate(FailureCollector collector) { | |
| } | ||
| } | ||
|
|
||
| if (!containsMacro(NAME_CONTENT_TYPE) && !containsMacro(NAME_CUSTOM_CONTENT_TYPE) | ||
| && !Strings.isNullOrEmpty(contentType) && !contentType.equalsIgnoreCase(CONTENT_TYPE_OTHER) | ||
| && !containsMacro(NAME_FORMAT)) { | ||
| if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) { | ||
| validateContentType(collector); | ||
| } | ||
| } | ||
|
|
||
| try { | ||
| getSchema(); | ||
| } catch (IllegalArgumentException e) { | ||
| collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA).withStacktrace(e.getStackTrace()); | ||
| collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA) | ||
| .withStacktrace(e.getStackTrace()); | ||
| } | ||
|
|
||
| try { | ||
|
|
@@ -340,6 +380,69 @@ public void validate(FailureCollector collector) { | |
| } | ||
| } | ||
|
|
||
| //This method validates the specified content type for the used format. | ||
| public void validateContentType(FailureCollector failureCollector) { | ||
flakrimjusufi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| switch (format) { | ||
flakrimjusufi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| case FORMAT_AVRO: | ||
| if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_AVRO)) { | ||
| failureCollector.addFailure(String.format("Valid content types for avro are %s, %s.", | ||
| CONTENT_TYPE_APPLICATION_AVRO, DEFAULT_CONTENT_TYPE), null) | ||
| .withConfigProperty(NAME_CONTENT_TYPE); | ||
| } | ||
| break; | ||
| case FORMAT_JSON: | ||
| if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_JSON) | ||
| && !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) { | ||
| failureCollector.addFailure(String.format( | ||
| "Valid content types for json are %s, %s, %s.", CONTENT_TYPE_APPLICATION_JSON, | ||
| CONTENT_TYPE_TEXT_PLAIN, DEFAULT_CONTENT_TYPE), null | ||
| ).withConfigProperty(NAME_CONTENT_TYPE); | ||
| } | ||
| break; | ||
| case FORMAT_CSV: | ||
| if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV) | ||
| && !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV) | ||
| && !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) { | ||
| failureCollector.addFailure(String.format( | ||
| "Valid content types for csv are %s, %s, %s, %s.", CONTENT_TYPE_APPLICATION_CSV, | ||
| CONTENT_TYPE_TEXT_PLAIN, CONTENT_TYPE_TEXT_CSV, DEFAULT_CONTENT_TYPE), null | ||
| ).withConfigProperty(NAME_CONTENT_TYPE); | ||
| } | ||
| break; | ||
| case FORMAT_DELIMITED: | ||
| if (!contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN) | ||
| && !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV) | ||
| && !contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV) | ||
| && !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) { | ||
| failureCollector.addFailure(String.format( | ||
| "Valid content types for delimited are %s, %s, %s, %s, %s.", CONTENT_TYPE_TEXT_PLAIN, | ||
| CONTENT_TYPE_TEXT_CSV, CONTENT_TYPE_APPLICATION_CSV, CONTENT_TYPE_TEXT_TSV, DEFAULT_CONTENT_TYPE), null | ||
| ).withConfigProperty(NAME_CONTENT_TYPE); | ||
| } | ||
| break; | ||
| case FORMAT_PARQUET: | ||
| if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) { | ||
| failureCollector.addFailure(String.format("Valid content type for parquet is %s.", DEFAULT_CONTENT_TYPE), | ||
| null).withConfigProperty(NAME_CONTENT_TYPE); | ||
| } | ||
| break; | ||
| case FORMAT_ORC: | ||
| if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) { | ||
| failureCollector.addFailure(String.format("Valid content type for orc is %s.", DEFAULT_CONTENT_TYPE), | ||
| null).withConfigProperty(NAME_CONTENT_TYPE); | ||
| } | ||
| break; | ||
| case FORMAT_TSV: | ||
| if (!contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN) | ||
| && !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) { | ||
| failureCollector.addFailure(String.format( | ||
| "Valid content types for tsv are %s, %s, %s.", CONTENT_TYPE_TEXT_TSV, CONTENT_TYPE_TEXT_PLAIN, | ||
| DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE); | ||
| } | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| public String getBucket() { | ||
| return GCSPath.from(path).getBucket(); | ||
| } | ||
|
|
@@ -383,6 +486,30 @@ public String getLocation() { | |
| return location; | ||
| } | ||
|
|
||
| /* This method gets the value of content type. Valid content types for each format are: | ||
| * | ||
| * avro -> application/avro, application/octet-stream | ||
| * json -> application/json, text/plain, application/octet-stream | ||
| * csv -> application/csv, text/csv, text/plain, application/octet-stream | ||
| * delimited -> application/csv, text/csv, text/plain, text/tsv, application/octet-stream | ||
| * orc -> application/octet-stream | ||
| * parquet -> application/octet-stream | ||
| * tsv -> text/tab-separated-values, application/octet-stream | ||
| */ | ||
| @Nullable | ||
| public String getContentType() { | ||
CuriousVini marked this conversation as resolved.
Show resolved
Hide resolved
CuriousVini marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (!Strings.isNullOrEmpty(contentType)) { | ||
| if (contentType.equals(CONTENT_TYPE_OTHER)) { | ||
| if (Strings.isNullOrEmpty(customContentType)) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This means if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same in here as well, if the value of customContentType is a macro, the value that was specified by the user is being used. |
||
| return DEFAULT_CONTENT_TYPE; | ||
| } | ||
| return customContentType; | ||
| } | ||
| return contentType; | ||
| } | ||
| return DEFAULT_CONTENT_TYPE; | ||
flakrimjusufi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| public Map<String, String> getFileSystemProperties() { | ||
| if (fileSystemProperties == null || fileSystemProperties.isEmpty()) { | ||
| return Collections.emptyMap(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,165 @@ | ||
| /* | ||
| * Copyright © 2015-2020 Cask Data, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
| * use this file except in compliance with the License. You may obtain a copy of | ||
| * the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
| * License for the specific language governing permissions and limitations under | ||
| * the License. | ||
| */ | ||
|
|
||
| package io.cdap.plugin.gcp.gcs.sink; | ||
flakrimjusufi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| import com.google.cloud.storage.Blob; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import io.cdap.plugin.gcp.common.GCPUtils; | ||
| import io.cdap.plugin.gcp.gcs.StorageClient; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.mapreduce.JobContext; | ||
| import org.apache.hadoop.mapreduce.JobStatus; | ||
| import org.apache.hadoop.mapreduce.OutputCommitter; | ||
| import org.apache.hadoop.mapreduce.TaskAttemptContext; | ||
| import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
|
|
||
| /** | ||
| * OutputCommitter for GCS | ||
| */ | ||
| public class GCSOutputCommitter extends OutputCommitter { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class); | ||
| public static final String RECORD_COUNT_FORMAT = "recordcount.%s"; | ||
|
|
||
| private final OutputCommitter delegate; | ||
|
|
||
| public GCSOutputCommitter(OutputCommitter delegate) { | ||
| this.delegate = delegate; | ||
| } | ||
|
|
||
| @Override | ||
| public void setupJob(JobContext jobContext) throws IOException { | ||
| delegate.setupJob(jobContext); | ||
| } | ||
|
|
||
| @Override | ||
| public void cleanupJob(JobContext jobContext) throws IOException { | ||
| delegate.cleanupJob(jobContext); | ||
| } | ||
|
|
||
| @Override | ||
| public void commitJob(JobContext jobContext) throws IOException { | ||
| delegate.commitJob(jobContext); | ||
| } | ||
|
|
||
| @Override | ||
| public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { | ||
| delegate.abortJob(jobContext, state); | ||
| } | ||
|
|
||
| @Override | ||
| public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { | ||
| delegate.setupTask(taskAttemptContext); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { | ||
| return delegate.needsTaskCommit(taskAttemptContext); | ||
| } | ||
|
|
||
| @Override | ||
| public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { | ||
| /*On commit task, there seems to be some inconsistency across different hadoop implementations regarding the path | ||
| where output file is stored. For some implementations it appears in the path returned by FileOutputCommitter | ||
| getCommittedTaskPath and for some it does not.Before commit, the files appear to be consistently present in path | ||
| returned by FileOutputCommitter getTaskAttemptPath. Hence, find the output file from taskAttemptPath and add | ||
| metadata before commit happens. After commit, file would have been moved out of the taskAttemptPath. */ | ||
| try { | ||
| updateMetricMetaData(taskAttemptContext); | ||
| } catch (Exception exception) { | ||
| LOG.warn("Unable to record metric for task. Metric emitted for the number of affected rows may be incorrect.", | ||
| exception); | ||
| } | ||
|
|
||
| delegate.commitTask(taskAttemptContext); | ||
| } | ||
|
|
||
| private void updateMetricMetaData(TaskAttemptContext taskAttemptContext) throws IOException { | ||
| if (!(delegate instanceof FileOutputCommitter)) { | ||
| return; | ||
| } | ||
|
|
||
| FileOutputCommitter fileOutputCommitter = (FileOutputCommitter) delegate; | ||
| Configuration configuration = taskAttemptContext.getConfiguration(); | ||
| //Task is not yet committed, so should be available in attempt path | ||
| Path taskAttemptPath = fileOutputCommitter.getTaskAttemptPath(taskAttemptContext); | ||
| if (configuration == null || taskAttemptPath == null) { | ||
| return; | ||
| } | ||
|
|
||
| //read the count from configuration | ||
| String keyInConfig = String.format(RECORD_COUNT_FORMAT, taskAttemptContext.getTaskAttemptID()); | ||
| Map<String, String> metaData = new HashMap<>(); | ||
| metaData.put(GCSBatchSink.RECORD_COUNT, String.valueOf(configuration.getLong(keyInConfig, 0L))); | ||
| StorageClient storageClient = getStorageClient(configuration); | ||
| //update metadata on the output file present in the directory for this task | ||
| Blob blob = storageClient.pickABlob(taskAttemptPath.toString()); | ||
| if (blob == null) { | ||
| LOG.info("Could not find a file in path {} to apply count metadata.", taskAttemptPath.toString()); | ||
| return; | ||
| } | ||
| blob.toBuilder().setContentType(configuration.get(GCSBatchSink.CONTENT_TYPE)).setMetadata(metaData).build() | ||
| .update(); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| StorageClient getStorageClient(Configuration configuration) throws IOException { | ||
| String project = configuration.get(GCPUtils.FS_GS_PROJECT_ID); | ||
| String serviceAccount = null; | ||
| boolean isServiceAccountFile = GCPUtils.SERVICE_ACCOUNT_TYPE_FILE_PATH | ||
| .equals(configuration.get(GCPUtils.SERVICE_ACCOUNT_TYPE)); | ||
| if (isServiceAccountFile) { | ||
| serviceAccount = configuration.get(GCPUtils.CLOUD_JSON_KEYFILE, null); | ||
| } else { | ||
| serviceAccount = configuration.get(String.format("%s.%s", GCPUtils.CLOUD_JSON_KEYFILE_PREFIX, | ||
| GCPUtils.CLOUD_ACCOUNT_JSON_SUFFIX)); | ||
| } | ||
| return StorageClient.create(project, serviceAccount, isServiceAccountFile); | ||
| } | ||
|
|
||
| @Override | ||
| public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { | ||
| delegate.abortTask(taskAttemptContext); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException { | ||
| return delegate.isCommitJobRepeatable(jobContext); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isRecoverySupported(JobContext jobContext) throws IOException { | ||
| return delegate.isRecoverySupported(jobContext); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isRecoverySupported() { | ||
| return delegate.isRecoverySupported(); | ||
| } | ||
|
|
||
| @Override | ||
| public void recoverTask(TaskAttemptContext taskContext) throws IOException { | ||
| delegate.recoverTask(taskContext); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.