Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions examples/java8/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@

<build>
<plugins>
<!-- Disable javadoc for now.
TODO: this section should be removed as soon as possible. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>

<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class GameStats extends LeaderBoard {
/**
* Filter out all but those users with a high clickrate, which we will consider as 'spammy' uesrs.
* We do this by finding the mean total score per user, then using that information as a side
* input to filter out all but those user scores that are > (mean * SCORE_WEIGHT)
* input to filter out all but those user scores that are greater (mean * SCORE_WEIGHT).
*/
// [START DocInclude_AbuseDetect]
public static class CalculateSpammyUsers
Expand Down Expand Up @@ -193,6 +193,8 @@ static interface Options extends LeaderBoard.Options {
/**
* Create a map of information that describes how to write pipeline output to BigQuery. This map
* is used to write information about team score sums.
*
* @return The map of fields to write on big query.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really disable whatever requirement is for this. The statement is often redundant with the basic javadoc and just adds burden. It is useful for inheritance situations because it is separate from {@inheritDoc} but otherwise I don't think it matters much.

*/
protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
configureWindowedWrite() {
Expand All @@ -219,6 +221,8 @@ static interface Options extends LeaderBoard.Options {
/**
* Create a map of information that describes how to write pipeline output to BigQuery. This map
* is used to write information about mean user session time.
*
* @return The map of fields to write on big query.
*/
protected static Map<String, WriteWindowedToBigQuery.FieldInfo<Double>>
configureSessionWindowWrite() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ static interface Options extends UserScore.Options {
* Create a map of information that describes how to write pipeline output to BigQuery. This map
* is passed to the {@link WriteWindowedToBigQuery} constructor to write team score sums and
* includes information about window start time.
*
* @return The map of field to write on big querey.
*/
protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
configureWindowedTableWrite() {
Expand All @@ -142,6 +144,9 @@ static interface Options extends UserScore.Options {

/**
* Run a batch pipeline to do windowed analysis of the data.
*
* @param args The main arguments.
* @throws Exception in case of execution failure.
*/
// [START DocInclude_HTSMain]
public static void main(String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
* results, e.g. for 'team prizes'. We're now outputing window results as they're
* calculated, giving us much lower latency than with the previous batch examples.
*
* <p> Run {@link injector.Injector} to generate pubsub data for this pipeline. The Injector
* <p> Run {@code injector.Injector} to generate pubsub data for this pipeline. The Injector
* documentation provides more detail on how to do this.
*
* <p> To execute this pipeline using the Dataflow service, specify the pipeline configuration
Expand Down Expand Up @@ -128,6 +128,8 @@ static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions
/**
* Create a map of information that describes how to write pipeline output to BigQuery. This map
* is used to write team score sums and includes event timing information.
*
* @return The map of fields to write on big query.
*/
protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
configureWindowedTableWrite() {
Expand Down Expand Up @@ -158,6 +160,8 @@ static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions
/**
* Create a map of information that describes how to write pipeline output to BigQuery. This map
* is used to write user score sums.
*
* @return the Map of field to write on big query.
*/
protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
configureGlobalWindowBigQueryWrite() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
* where the BigQuery dataset you specify must already exist.
*
* <p> Optionally include the --input argument to specify a batch input file.
* See the --input default value for example batch data file, or use {@link injector.Injector} to
* See the --input default value for example batch data file, or use {@code injector.Injector} to
* generate your own batch data.
*/
public class UserScore {
Expand Down Expand Up @@ -201,6 +201,8 @@ public static interface Options extends PipelineOptions {
/**
* Create a map of information that describes how to write pipeline output to BigQuery. This map
* is passed to the {@link WriteToBigQuery} constructor to write user score sums.
*
* @return A map to fields written on big query.
*/
protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
configureBigQueryWrite() {
Expand All @@ -216,6 +218,9 @@ public static interface Options extends PipelineOptions {

/**
* Run a batch pipeline.
*
* @param args The main arguments.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also for @param I don't think it is that helpful to require the annotations. Usually it is better to have sentences that actually describe what is going on. Code review is enough to make sure that when @param is needed that someone will add it, but that is maybe 10% of the time or less IMO.

* @throws Exception in case of execution failure.
*/
// [START DocInclude_USMain]
public static void main(String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ public void processElement(ProcessContext c) {
}
}

/** Build the output table schema. */
/** Build the output table schema.
*
* @return The {@link TableSchema}.
*/
protected TableSchema getSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
Expand Down
10 changes: 0 additions & 10 deletions runners/flink/examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,6 @@
</plugin>
-->

<!-- Disable javadoc for now.
TODO: this section should be removed as soon as possible. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ private interface Options extends PipelineOptions, FlinkPipelineOptions {

/**
* Lists documents contained beneath the {@code options.input} prefix/directory.
*
* @param options the pipeline options.
* @throws URISyntaxException in case of malformed URI
* @throws IOException in case of URI loading issue.
*/
public static Set<URI> listInputDocuments(Options options)
throws URISyntaxException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@

/**
* To run the example, first open a socket on a terminal by executing the command:
* <li>
* <ul>
* <li>
* <code>nc -lk 9999</code>
* </li>
* </li>
* </ul>
* and then launch the example. Now whatever you type in the terminal is going to be
* the input to the program.
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@

/**
* To run the example, first open two sockets on two terminals by executing the commands:
* <li>
* <ul>
* <li>
* <code>nc -lk 9999</code>, and
* </li>
* <li>
* <code>nc -lk 9998</code>
* </li>
* </li>
* </ul>
* and then launch the example. Now whatever you type in the terminal is going to be
* the input to the program.
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public static void main(String[] args) {

/**
* Serialiation/Deserialiation schema for Avro types
* @param <T>
*
* @param <T> serialization type.
*/
static class AvroSerializationDeserializationSchema<T>
implements SerializationSchema<T>, DeserializationSchema<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@

/**
* To run the example, first open a socket on a terminal by executing the command:
* <li>
* <ul>
* <li>
* <code>nc -lk 9999</code>
* </li>
* </li>
* </ul>
* and then launch the example. Now whatever you type in the terminal is going to be
* the input to the program.
* */
Expand Down
10 changes: 0 additions & 10 deletions runners/flink/runner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,6 @@
</plugin>
-->

<!-- Disable javadoc for now.
TODO: this section should be removed as soon as possible. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>

<!-- Integration Tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ private void createPipelineTranslator() {
* the {@link org.apache.beam.sdk.values.PCollection} program into
* a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream}
* one.
*
* @param pipeline The pipeline to translate.
* */
public void translate(Pipeline pipeline) {
checkInitializationState();
Expand All @@ -134,7 +136,10 @@ public void translate(Pipeline pipeline) {

/**
* Launches the program execution.
* */
*
* @return The job execution result.
* @throws Exception In case of error during pipeline execution.
*/
public JobExecutionResult executePipeline() throws Exception {
if (options.isStreaming()) {
if (this.flinkStreamEnv == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@
public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions {

/**
* <p>
* List of local files to make available to workers.
* </p>
* <p>
* Jars are placed on the worker's classpath.
* </p>
* <p>
* The default value is the list of jars from the main program's classpath.
* </p>
*
* @return The list of files to stage.
*/
@Description("Jar-Files to send to all workers and put on the classpath. " +
"The default value is all files from the classpath.")
Expand All @@ -53,6 +59,8 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp

/**
* The job name is used to identify jobs running on a Flink cluster.
*
* @return The Flink job name.
*/
@Description("Flink job name, to uniquely identify active jobs. "
+ "Defaults to using the ApplicationName-UserName-Date.")
Expand All @@ -66,6 +74,8 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
* Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink
* Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while
* "[auto]" will let the system decide where to execute the pipeline based on the environment.
*
* @return The Flink master URL.
*/
@Description("Address of the Flink Master where the Pipeline should be executed. Can" +
" either be of the form \"host:port\" or one of the special values [local], " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@
import java.util.Map;

/**
* <p>
* A {@link PipelineRunner} that executes the operations in the
* pipeline by first translating them to a Flink Plan and then executing them either locally
* or on a Flink cluster, depending on the configuration.
* <p>
* </p>
*/
public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {

Expand Down Expand Up @@ -133,6 +134,8 @@ public FlinkRunnerResult run(Pipeline pipeline) {

/**
* For testing.
*
* @return The {@link FlinkPipelineOptions}.
*/
public FlinkPipelineOptions getPipelineOptions() {
return options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public <T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) {

/**
* Sets the AppliedPTransform which carries input/output.
* @param currentTransform
*
* @param currentTransform The current applied {@link PTransform}.
*/
public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
this.currentTransform = currentTransform;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* {@link org.apache.beam.sdk.values.PCollection}-based job into a
* {@link org.apache.flink.streaming.api.datastream.DataStream} one.
*
* This is based on {@link org.apache.beam.runners.dataflow.DataflowPipelineTranslator}
* This is based on {@code org.apache.beam.runners.dataflow.DataflowPipelineTranslator}
* */
public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public void setOutputDataStream(PValue value, DataStream<?> set) {

/**
* Sets the AppliedPTransform which carries input/output.
* @param currentTransform
*
* @param currentTransform the current applied {@link PTransform}.
*/
public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
this.currentTransform = currentTransform;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class InspectableByteArrayOutputStream extends ByteArrayOutputStream {

/**
* Get the underlying byte array.
*
* @return a buffer as {@code byte[]}.
*/
public byte[] getBuffer() {
return buf;
Expand Down
Loading