Skip to content

Commit 647988e

Browse files
author
Rachel Gagnon
committed
reformatting streaming benchmark
1 parent f3fa143 commit 647988e

File tree

2 files changed

+23
-15
lines changed

2 files changed

+23
-15
lines changed

Diff for: examples/dataflow-examples/java/dataflow-streaming-benchmark/src/main/java/com/google/cloud/pso/pipeline/StreamingBenchmark.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import java.util.ArrayList;
3737
import java.util.List;
3838
import java.util.Map;
39-
4039
import org.apache.beam.sdk.Pipeline;
4140
import org.apache.beam.sdk.PipelineResult;
4241
import org.apache.beam.sdk.io.FileSystems;
@@ -52,8 +51,8 @@
5251
import org.apache.beam.sdk.transforms.DoFn;
5352
import org.apache.beam.sdk.transforms.ParDo;
5453
import org.joda.time.Duration;
55-
import org.slf4j.LoggerFactory;
5654
import org.slf4j.Logger;
55+
import org.slf4j.LoggerFactory;
5756

5857
/**
5958
* The {@link StreamingBenchmark} is a streaming pipeline which generates messages at a specified

Diff for: examples/dataflow-examples/java/dataflow-streaming-benchmark/src/test/java/com/google/cloud/pso/pipeline/StreamingBenchmarkTest.java

+22-13
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import static org.hamcrest.CoreMatchers.notNullValue;
2222
import static org.hamcrest.MatcherAssert.assertThat;
2323

24-
import com.google.cloud.pso.pipeline.StreamingBenchmark.MessageGeneratorFn;
2524
import com.google.cloud.pso.pipeline.StreamingBenchmark.MalformedSchemaException;
25+
import com.google.cloud.pso.pipeline.StreamingBenchmark.MessageGeneratorFn;
2626
import com.google.common.io.ByteStreams;
2727
import java.io.ByteArrayInputStream;
2828
import java.io.File;
@@ -119,7 +119,8 @@ public void testMessageGenerator() throws IOException {
119119
PCollection<PubsubMessage> results =
120120
pipeline
121121
.apply("CreateInput", Create.of(0L))
122-
.apply("GenerateMessage", ParDo.of(new MessageGeneratorFn(file.getAbsolutePath(), true)));
122+
.apply(
123+
"GenerateMessage", ParDo.of(new MessageGeneratorFn(file.getAbsolutePath(), true)));
123124

124125
// Assert
125126
//
@@ -153,12 +154,16 @@ public void testInvalidSchemaThrowsException() throws IOException {
153154
PCollection<PubsubMessage> results =
154155
pipeline
155156
.apply("CreateInput", Create.of(0L))
156-
.apply("GenerateMessage", ParDo.of(new MessageGeneratorFn(file.getAbsolutePath(), true)));
157+
.apply(
158+
"GenerateMessage", ParDo.of(new MessageGeneratorFn(file.getAbsolutePath(), true)));
157159

158160
pipeline.run();
159161
}
160162

161-
/** Tests the {@link MessageGeneratorFn} should not fails when given invalid schema with validateSchema set to false. */
163+
/**
164+
* Tests the {@link MessageGeneratorFn} should not fails when given invalid schema with
165+
* validateSchema set to false.
166+
*/
162167
@Test
163168
public void testInvalidSchemaIgnoringValidation() throws IOException {
164169
// Arrange
@@ -171,21 +176,25 @@ public void testInvalidSchemaIgnoringValidation() throws IOException {
171176
// Act
172177
//
173178
PCollection<PubsubMessage> results =
174-
pipeline
175-
.apply("CreateInput", Create.of(0L))
176-
.apply("GenerateMessage", ParDo.of(new MessageGeneratorFn(file.getAbsolutePath(), false)));
179+
pipeline
180+
.apply("CreateInput", Create.of(0L))
181+
.apply(
182+
"GenerateMessage", ParDo.of(new MessageGeneratorFn(file.getAbsolutePath(), false)));
177183

178184
// Assert
179185
//
180-
PAssert.that(results).satisfies(input -> {
181-
PubsubMessage message = input.iterator().next();
182-
assertThat(message, is(notNullValue()));
183-
assertThat(new String(message.getPayload()), is(equalTo(schema)));
184-
return null;
185-
});
186+
PAssert.that(results)
187+
.satisfies(
188+
input -> {
189+
PubsubMessage message = input.iterator().next();
190+
assertThat(message, is(notNullValue()));
191+
assertThat(new String(message.getPayload()), is(equalTo(schema)));
192+
return null;
193+
});
186194

187195
pipeline.run();
188196
}
197+
189198
/**
190199
* Helper to generate files for testing.
191200
*

0 commit comments

Comments
 (0)