Skip to content

dataflow-examples folder #1451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -49,9 +49,8 @@
* how to enhance the documents with external data sitting in Cloud Bigtable prior to indexing it
* into Elasticsearch.
*
* Note: The idField should be provided in {@link com.fasterxml.jackson.core.JsonPointer} syntax
* e.g.: { "sku": 123}
* will have an id field of: /sku
* <p>Note: The idField should be provided in {@link com.fasterxml.jackson.core.JsonPointer} syntax
* e.g.: { "sku": 123} will have an id field of: /sku
*
* <pre>
* Build and execute:
Original file line number Diff line number Diff line change
@@ -17,13 +17,13 @@
package com.google.cloud.pso.coders;

import com.google.cloud.pso.common.ErrorMessage;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/** {@link org.apache.beam.sdk.coders.Coder} for {@link ErrorMessage} */
public class ErrorMessageCoder extends CustomCoder<ErrorMessage> {
Original file line number Diff line number Diff line change
@@ -19,12 +19,12 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;

public class JsonNodeCoder extends CustomCoder<JsonNode> {

Original file line number Diff line number Diff line change
@@ -16,11 +16,11 @@

package com.google.cloud.pso.common;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;

import static com.google.common.base.Preconditions.checkArgument;

/**
* A class to hold messages that fail validation either because the json is not well formed or
* because the key element required to be present within the json is missing. The class encapsulates
Original file line number Diff line number Diff line change
@@ -16,8 +16,11 @@

package com.google.cloud.pso.common;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import java.io.IOException;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -32,9 +35,6 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;

import static com.google.common.base.Preconditions.checkArgument;

/**
* This {@link PTransform} accepts a {@link PCollection} of key and payload {@link KV} and performs
Original file line number Diff line number Diff line change
@@ -16,12 +16,15 @@

package com.google.cloud.pso.common;

import static com.google.common.base.Preconditions.checkArgument;

import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.google.auto.value.AutoValue;
import com.google.common.base.Throwables;
import java.io.IOException;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -30,9 +33,6 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import java.io.IOException;

import static com.google.common.base.Preconditions.checkArgument;

/**
* A {@link PTransform} that validates a {@link PCollection<String>} and performs the following
Original file line number Diff line number Diff line change
@@ -18,10 +18,10 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import java.io.IOException;
import java.util.Map;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;

/**
* A {@link DoFn} that extracts a Boolean value associated with a json payload and adds a new field
Original file line number Diff line number Diff line change
@@ -25,6 +25,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Random;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -44,8 +46,6 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.json.JSONArray;
import org.json.JSONObject;
import java.io.IOException;
import java.util.Random;

/**
* A utility Dataflow pipeline to generate sample products data available at
@@ -55,9 +55,9 @@
* and table with a column family to stage the metadata for the demo. 3. Create a PubSub topic to
* publish product JSONs to.
*
* Note: The idField should be provided in {@link com.fasterxml.jackson.core.JsonPointer} syntax
* e.g.: { "sku": 123}
* will have an id field of: /sku
* <p>Note: The idField should be provided in {@link com.fasterxml.jackson.core.JsonPointer} syntax
* e.g.: { "sku": 123} will have an id field of: /sku
*
* <pre>
* Build and execute:
* mvn compile exec:java -Dexec.mainClass=com.google.cloud.pso.utils.PublishProducts -Dexec.args=" \
@@ -255,4 +255,4 @@ public void processElement(ProcessContext context) throws IOException {
context.output(KV.of(ByteString.copyFrom(key), mutations));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -16,16 +16,16 @@

package com.google.cloud.pso.common;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.IOException;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

/** Test class for {@link com.google.cloud.pso.common.ExtractKeyFn} */
@RunWith(JUnit4.class)
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@

package com.google.cloud.pso.common;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.pso.coders.ErrorMessageCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
@@ -33,8 +35,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import static com.google.common.truth.Truth.assertThat;

/** Test class for {@link FailSafeValidate} */
@RunWith(JUnit4.class)
public class FailSafeValidateTest {
Original file line number Diff line number Diff line change
@@ -16,11 +16,15 @@

package com.google.cloud.pso.utils;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.bigtable.v2.Mutation;
import com.google.cloud.pso.coders.JsonNodeCoder;
import com.google.protobuf.ByteString;
import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
@@ -36,10 +40,6 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.IOException;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

/** Test class for {@link PublishProducts} */
@RunWith(JUnit4.class)
Original file line number Diff line number Diff line change
@@ -36,7 +36,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
@@ -52,8 +51,8 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.joda.time.Duration;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The {@link StreamingBenchmark} is a streaming pipeline which generates messages at a specified
Original file line number Diff line number Diff line change
@@ -21,8 +21,8 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;

import com.google.cloud.pso.pipeline.StreamingBenchmark.MessageGeneratorFn;
import com.google.cloud.pso.pipeline.StreamingBenchmark.MalformedSchemaException;
import com.google.cloud.pso.pipeline.StreamingBenchmark.MessageGeneratorFn;
import com.google.common.io.ByteStreams;
import java.io.ByteArrayInputStream;
import java.io.File;
@@ -119,7 +119,8 @@ public void testMessageGenerator() throws IOException {
PCollection<PubsubMessage> results =
pipeline
.apply("CreateInput", Create.of(0L))
.apply("GenerateMessage", ParDo.of(new MessageGeneratorFn(file.getAbsolutePath(), true)));
.apply(
"GenerateMessage", ParDo.of(new MessageGeneratorFn(file.getAbsolutePath(), true)));

// Assert
//
@@ -153,12 +154,16 @@ public void testInvalidSchemaThrowsException() throws IOException {
PCollection<PubsubMessage> results =
pipeline
.apply("CreateInput", Create.of(0L))
.apply("GenerateMessage", ParDo.of(new MessageGeneratorFn(file.getAbsolutePath(), true)));
.apply(
"GenerateMessage", ParDo.of(new MessageGeneratorFn(file.getAbsolutePath(), true)));

pipeline.run();
}

/** Tests the {@link MessageGeneratorFn} should not fails when given invalid schema with validateSchema set to false. */
/**
* Tests the {@link MessageGeneratorFn} should not fails when given invalid schema with
* validateSchema set to false.
*/
@Test
public void testInvalidSchemaIgnoringValidation() throws IOException {
// Arrange
@@ -171,21 +176,25 @@ public void testInvalidSchemaIgnoringValidation() throws IOException {
// Act
//
PCollection<PubsubMessage> results =
pipeline
.apply("CreateInput", Create.of(0L))
.apply("GenerateMessage", ParDo.of(new MessageGeneratorFn(file.getAbsolutePath(), false)));
pipeline
.apply("CreateInput", Create.of(0L))
.apply(
"GenerateMessage", ParDo.of(new MessageGeneratorFn(file.getAbsolutePath(), false)));

// Assert
//
PAssert.that(results).satisfies(input -> {
PubsubMessage message = input.iterator().next();
assertThat(message, is(notNullValue()));
assertThat(new String(message.getPayload()), is(equalTo(schema)));
return null;
});
PAssert.that(results)
.satisfies(
input -> {
PubsubMessage message = input.iterator().next();
assertThat(message, is(notNullValue()));
assertThat(new String(message.getPayload()), is(equalTo(schema)));
return null;
});

pipeline.run();
}

/**
* Helper to generate files for testing.
*
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
33 changes: 0 additions & 33 deletions examples/dataflow-xml-pubsub-to-gcs/python/.vscode/launch.json

This file was deleted.

5 changes: 0 additions & 5 deletions helpers/exclusion_list.txt
Original file line number Diff line number Diff line change
@@ -45,11 +45,6 @@
./examples/cloudml-sklearn-pipeline
./examples/cloudsql-custom-metric
./examples/cryptorealtime
./examples/dataflow-bigquery-transpose
./examples/dataflow-data-generator
./examples/dataflow-elasticsearch-indexer
./examples/dataflow-scala-kafka2avro
./examples/dataflow-streaming-benchmark
./examples/dataproc-persistent-history-server
./examples/dlp
./examples/e2e-home-appliance-status-monitoring