Skip to content

Commit 42f75fd

Browse files
authored
Merge pull request data-integrations#1357 from data-integrations/bugfix/plugin-1735
[PLUGIN-1735] Fix read timeout issue during GCS Bucket copy/move actions for large files
2 parents b4c7d04 + c3fa512 commit 42f75fd

File tree

9 files changed

+73
-6
lines changed

9 files changed

+73
-6
lines changed

docs/GCSCopy-action.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ This value is ignored if the bucket already exists.
5050
If the bucket already exists, this is ignored. More information can be found
5151
[here](https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys)
5252

53+
**Read Timeout:** Timeout in seconds to read data from an established HTTP connection (Default value is 20).
54+
For performing copy/move operation on large files in GCS buckets, a higher timeout might be needed. Setting it to 0
55+
implies infinite timeout (no limit on the timeout) [NOT RECOMMENDED]
56+
5357
Example
5458
-------
5559

docs/GCSMove-action.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ This value is ignored if the bucket already exists.
5151
If the bucket already exists, this is ignored. More information can be found
5252
[here](https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys)
5353

54+
**Read Timeout:** Timeout in seconds to read data from an established HTTP connection (Default value is 20).
55+
For performing copy/move operation on large files in GCS buckets, a higher timeout might be needed. Setting it to 0
56+
implies infinite timeout (no limit on the timeout) [NOT RECOMMENDED]
57+
5458
Example
5559
-------
5660

src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ private void emitMetricsForStorageBucket(boolean succeeded, BatchSinkContext con
615615
}
616616
try {
617617
StorageClient storageClient = StorageClient.create(config.getProject(), config.getServiceAccount(),
618-
config.isServiceAccountFilePath());
618+
config.isServiceAccountFilePath(), null);
619619
storageClient.mapMetaDataForAllBlobs(outputPath,
620620
new MetricsEmitter(context.getMetrics())::emitMetrics);
621621
} catch (Exception e) {

src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.cdap.plugin.gcp.gcs;
1818

1919
import com.google.api.gax.paging.Page;
20+
import com.google.cloud.http.HttpTransportOptions;
2021
import com.google.cloud.kms.v1.CryptoKeyName;
2122
import com.google.cloud.storage.Blob;
2223
import com.google.cloud.storage.BlobId;
@@ -320,17 +321,21 @@ private static String toPath(BlobId blobId) {
320321
}
321322

322323
public static StorageClient create(String project, @Nullable String serviceAccount,
323-
Boolean isServiceAccountFilePath) throws IOException {
324+
Boolean isServiceAccountFilePath, @Nullable Integer readTimeout)
325+
throws IOException {
324326
StorageOptions.Builder builder = StorageOptions.newBuilder().setProjectId(project);
325327
if (serviceAccount != null) {
326328
builder.setCredentials(GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath));
327329
}
330+
if (readTimeout != null) {
331+
builder.setTransportOptions(HttpTransportOptions.newBuilder().setReadTimeout(readTimeout * 1000).build());
332+
}
328333
Storage storage = builder.build().getService();
329334
return new StorageClient(storage);
330335
}
331336

332337
public static StorageClient create(GCPConnectorConfig config) throws IOException {
333-
return create(config.getProject(), config.getServiceAccount(), config.isServiceAccountFilePath());
338+
return create(config.getProject(), config.getServiceAccount(), config.isServiceAccountFilePath(), null);
334339
}
335340

336341
/**

src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSCopy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void run(ActionContext context) throws IOException {
6767
return;
6868
}
6969
StorageClient storageClient = StorageClient.create(config.getProject(), config.getServiceAccount(),
70-
isServiceAccountFilePath);
70+
isServiceAccountFilePath, config.readTimeout);
7171

7272
GCSPath destPath = config.getDestPath();
7373
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);

src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSMove.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void run(ActionContext context) throws IOException {
6767
return;
6868
}
6969
StorageClient storageClient = StorageClient.create(config.getProject(), config.getServiceAccount(),
70-
isServiceAccountFilePath);
70+
isServiceAccountFilePath, config.readTimeout);
7171
GCSPath destPath = config.getDestPath();
7272
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
7373
collector.getOrThrowException();

src/main/java/io/cdap/plugin/gcp/gcs/actions/SourceDestConfig.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class SourceDestConfig extends GCPConfig {
4343
public static final String NAME_SOURCE_PATH = "sourcePath";
4444
public static final String NAME_DEST_PATH = "destPath";
4545
public static final String NAME_LOCATION = "location";
46+
public static final String READ_TIMEOUT = "readTimeout";
4647

4748
@Name(NAME_SOURCE_PATH)
4849
@Macro
@@ -74,15 +75,25 @@ public class SourceDestConfig extends GCPConfig {
7475
" at https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys")
7576
protected String cmekKey;
7677

78+
@Name(READ_TIMEOUT)
79+
@Macro
80+
@Nullable
81+
@Description("Timeout in seconds to read data from an established HTTP connection (Default value is 20). " +
82+
("For performing copy/move operation on large files in GCS buckets, set a higher value. " +
83+
"Set it to 0 for infinite(no limit)"))
84+
protected Integer readTimeout;
85+
7786
public SourceDestConfig(@Nullable String project, @Nullable String serviceAccountType,
7887
@Nullable String serviceFilePath, @Nullable String serviceAccountJson,
79-
@Nullable String destPath, @Nullable String location, @Nullable String cmekKey) {
88+
@Nullable String destPath, @Nullable String location, @Nullable Integer readTimeout,
89+
@Nullable String cmekKey) {
8090
this.serviceAccountType = serviceAccountType;
8191
this.serviceAccountJson = serviceAccountJson;
8292
this.serviceFilePath = serviceFilePath;
8393
this.project = project;
8494
this.destPath = destPath;
8595
this.location = location;
96+
this.readTimeout = readTimeout;
8697
this.cmekKey = cmekKey;
8798
}
8899

@@ -125,9 +136,22 @@ public void validate(FailureCollector collector, Map<String, String> arguments)
125136
if (!containsMacro(NAME_CMEK_KEY)) {
126137
validateCmekKey(collector, arguments);
127138
}
139+
if (!containsMacro(READ_TIMEOUT)) {
140+
validateReadTimeout(collector);
141+
}
128142
collector.getOrThrowException();
129143
}
130144

145+
void validateReadTimeout(FailureCollector collector) {
146+
if (readTimeout == null) {
147+
return;
148+
}
149+
if (readTimeout < 0) {
150+
collector.addFailure("Read Timeout cannot be less than 0. ",
151+
"Please enter 0 or a positive value.").withConfigProperty(READ_TIMEOUT);
152+
}
153+
}
154+
131155
//This method validated the pattern of CMEK Key resource ID.
132156
void validateCmekKey(FailureCollector failureCollector, Map<String, String> arguments) {
133157
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(cmekKey, arguments, failureCollector);
@@ -161,6 +185,7 @@ public static class Builder {
161185
private String destPath;
162186
private String cmekKey;
163187
private String location;
188+
private Integer readTimeout;
164189

165190
public SourceDestConfig.Builder setProject(@Nullable String project) {
166191
this.project = project;
@@ -205,6 +230,7 @@ public SourceDestConfig build() {
205230
serviceAccountJson,
206231
destPath,
207232
location,
233+
readTimeout,
208234
cmekKey
209235
);
210236
}

widgets/GCSCopy-action.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,20 @@
128128
}
129129
}
130130
]
131+
},
132+
{
133+
"label" : "Advanced",
134+
"properties" : [
135+
{
136+
"name": "readTimeout",
137+
"label" : "Read Timeout",
138+
"widget-type": "number",
139+
"widget-attributes": {
140+
"default": "20",
141+
"minimum": "0"
142+
}
143+
}
144+
]
131145
}
132146
],
133147
"filters": [

widgets/GCSMove-action.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,20 @@
128128
}
129129
}
130130
]
131+
},
132+
{
133+
"label" : "Advanced",
134+
"properties" : [
135+
{
136+
"name": "readTimeout",
137+
"label" : "Read Timeout",
138+
"widget-type": "number",
139+
"widget-attributes": {
140+
"default": "20",
141+
"minimum": "0"
142+
}
143+
}
144+
]
131145
}
132146
],
133147
"filters": [

0 commit comments

Comments
 (0)