diff --git a/examples/java/build.gradle b/examples/java/build.gradle index af0d7b80bee9..d03c5aa2f797 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -91,6 +91,8 @@ dependencies { compile "org.apache.commons:commons-lang3:3.9" compile "org.apache.httpcomponents:httpclient:4.5.13" compile "org.apache.httpcomponents:httpcore:4.4.13" + compile ("org.twitter4j:twitter4j-stream:4.0.7") + compile ("org.twitter4j:twitter4j-core:4.0.7") testCompile project(path: ":runners:direct-java", configuration: "shadow") testCompile project(":sdks:java:io:google-cloud-platform") testCompile project(":sdks:java:extensions:ml") diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/README.md b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/README.md new file mode 100644 index 000000000000..41e64fa74389 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/README.md @@ -0,0 +1,41 @@ + + +# Twitter Connector + +This directory contains an example pipelines for how to perform continues stream of data from twitter streaming api ( or any other 3rd party API ). This include: + + + +## Requirements + +- Java 8 +- Twitter developer app account and streaming credentials. + +This section describes what is needed to get the example up and running. + +- Gradle preparation +- Local execution \ No newline at end of file diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFn.java b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFn.java new file mode 100644 index 000000000000..d4b0d8a22600 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFn.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.twitterstreamgenerator; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import twitter4j.Status; + +/** Splittable dofn that read live data off twitter. * */ +@DoFn.UnboundedPerElement +final class ReadFromTwitterDoFn extends DoFn { + + private final DateTime startTime; + + ReadFromTwitterDoFn() { + this.startTime = new DateTime(); + } + /* Logger for class.*/ + private static final Logger LOG = LoggerFactory.getLogger(ReadFromTwitterDoFn.class); + + static class OffsetHolder implements Serializable { + public final @Nullable TwitterConfig twitterConfig; + public final @Nullable Long fetchedRecords; + + OffsetHolder(@Nullable TwitterConfig twitterConfig, @Nullable Long fetchedRecords) { + this.twitterConfig = twitterConfig; + this.fetchedRecords = fetchedRecords; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OffsetHolder that = (OffsetHolder) o; + return Objects.equals(twitterConfig, that.twitterConfig) + && Objects.equals(fetchedRecords, that.fetchedRecords); + } + + @Override + public int hashCode() { + return Objects.hash(twitterConfig, fetchedRecords); + } + } + + static class OffsetTracker extends RestrictionTracker + implements Serializable { + private OffsetHolder restriction; + private final DateTime startTime; + + OffsetTracker(OffsetHolder holder, DateTime startTime) { + this.restriction = holder; + this.startTime = startTime; + } + + @Override + public boolean tryClaim(TwitterConfig twitterConfig) { + LOG.debug( + "-------------- Claiming " + + twitterConfig.hashCode() + + " used to have: " + + restriction.fetchedRecords); + long fetchedRecords = + this.restriction == null || this.restriction.fetchedRecords == null + ? 0 + : this.restriction.fetchedRecords + 1; + long elapsedTime = System.currentTimeMillis() - startTime.getMillis(); + final long millis = 60 * 1000; + LOG.debug( + "-------------- Time running: {} / {}", + elapsedTime, + (twitterConfig.getMinutesToRun() * millis)); + if (fetchedRecords > twitterConfig.getTweetsCount() + || elapsedTime > twitterConfig.getMinutesToRun() * millis) { + return false; + } + this.restriction = new OffsetHolder(twitterConfig, fetchedRecords); + return true; + } + + @Override + public OffsetHolder currentRestriction() { + return restriction; + } + + @Override + public SplitResult trySplit(double fractionOfRemainder) { + LOG.debug("-------------- Trying to split: fractionOfRemainder=" + fractionOfRemainder); + return SplitResult.of(new OffsetHolder(null, 0L), restriction); + } + + @Override + public void checkDone() throws IllegalStateException {} + + @Override + public IsBounded isBounded() { + return IsBounded.UNBOUNDED; + } + } + + @GetInitialWatermarkEstimatorState + public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) { + return currentElementTimestamp; + } + + private static Instant ensureTimestampWithinBounds(Instant timestamp) { + if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) { + timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE; + } + return timestamp; + } + + @NewWatermarkEstimator + public WatermarkEstimators.Manual newWatermarkEstimator( + @WatermarkEstimatorState Instant watermarkEstimatorState) { + return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState)); + } + + @DoFn.GetInitialRestriction + public OffsetHolder getInitialRestriction(@Element TwitterConfig twitterConfig) + throws IOException { + return new OffsetHolder(null, 0L); + } + + @DoFn.NewTracker + public RestrictionTracker newTracker( + @Element TwitterConfig twitterConfig, @DoFn.Restriction OffsetHolder restriction) { + return new OffsetTracker(restriction, startTime); + } + + @GetRestrictionCoder + public Coder getRestrictionCoder() { + return SerializableCoder.of(OffsetHolder.class); + } + + @DoFn.ProcessElement + public DoFn.ProcessContinuation processElement( + @Element TwitterConfig twitterConfig, + DoFn.OutputReceiver out, + RestrictionTracker tracker, + ManualWatermarkEstimator watermarkEstimator) { + LOG.debug("In Read From Twitter Do Fn"); + TwitterConnection twitterConnection = TwitterConnection.getInstance(twitterConfig); + BlockingQueue queue = twitterConnection.getQueue(); + if (queue.isEmpty()) { + if (checkIfDone(twitterConnection, twitterConfig, tracker)) { + return DoFn.ProcessContinuation.stop(); + } + } + while (!queue.isEmpty()) { + Status status = queue.poll(); + if (checkIfDone(twitterConnection, twitterConfig, tracker)) { + return DoFn.ProcessContinuation.stop(); + } + if (status != null) { + Instant currentInstant = Instant.ofEpochMilli(status.getCreatedAt().getTime()); + watermarkEstimator.setWatermark(currentInstant); + out.outputWithTimestamp(status.getText(), currentInstant); + } + } + return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)); + } + + boolean checkIfDone( + TwitterConnection twitterConnection, + TwitterConfig twitterConfig, + RestrictionTracker tracker) { + if (!tracker.tryClaim(twitterConfig)) { + twitterConnection.closeStream(); + return true; + } else { + return false; + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConfig.java b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConfig.java new file mode 100644 index 000000000000..60346893fceb --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConfig.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.twitterstreamgenerator; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** {@link Serializable} object to store twitter configurations for a connection. * */ +@DefaultCoder(SerializableCoder.class) +public class TwitterConfig implements Serializable { + private final String key; + private final String secret; + private final String token; + private final String tokenSecret; + private final List filters; + private final String language; + private final Long tweetsCount; + private final Integer minutesToRun; + + private TwitterConfig(TwitterConfig.Builder builder) { + this.key = builder.key; + this.secret = builder.secret; + this.token = builder.token; + this.tokenSecret = builder.tokenSecret; + this.filters = builder.filters; + this.language = builder.language; + this.tweetsCount = builder.tweetsCount; + this.minutesToRun = builder.minutesToRun; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TwitterConfig that = (TwitterConfig) o; + return Objects.equals(key, that.key) + && Objects.equals(secret, that.secret) + && Objects.equals(token, that.token) + && Objects.equals(tokenSecret, that.tokenSecret) + && Objects.equals(filters, that.filters) + && Objects.equals(language, that.language) + && Objects.equals(tweetsCount, that.tweetsCount) + && Objects.equals(minutesToRun, that.minutesToRun); + } + + @Override + public int hashCode() { + return Objects.hash( + key, secret, token, tokenSecret, filters, language, tweetsCount, minutesToRun); + } + + public String getKey() { + return key; + } + + public String getSecret() { + return secret; + } + + public String getToken() { + return token; + } + + public String getTokenSecret() { + return tokenSecret; + } + + public List getFilters() { + return filters; + } + + public String getLanguage() { + return language; + } + + public Long getTweetsCount() { + return tweetsCount; + } + + public Integer getMinutesToRun() { + return minutesToRun; + } + + public static class Builder { + private String key = ""; + private String secret = ""; + private String token = ""; + private String tokenSecret = ""; + private List filters = new ArrayList<>(); + private String language = "en"; + private Long tweetsCount = Long.MAX_VALUE; + private Integer minutesToRun = Integer.MAX_VALUE; + + TwitterConfig.Builder setKey(final String key) { + this.key = key; + return this; + } + + TwitterConfig.Builder setSecret(final String secret) { + this.secret = secret; + return this; + } + + TwitterConfig.Builder setToken(final String token) { + this.token = token; + return this; + } + + TwitterConfig.Builder setTokenSecret(final String tokenSecret) { + this.tokenSecret = tokenSecret; + return this; + } + + TwitterConfig.Builder setFilters(final List filters) { + this.filters = filters; + return this; + } + + TwitterConfig.Builder setLanguage(final String language) { + this.language = language; + return this; + } + + TwitterConfig.Builder setTweetsCount(final Long tweetsCount) { + this.tweetsCount = tweetsCount; + return this; + } + + TwitterConfig.Builder setMinutesToRun(final Integer minutesToRun) { + this.minutesToRun = minutesToRun; + return this; + } + + TwitterConfig build() { + return new TwitterConfig(this); + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConnection.java b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConnection.java new file mode 100644 index 000000000000..e6cb223f2686 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConnection.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.twitterstreamgenerator; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import twitter4j.FilterQuery; +import twitter4j.StallWarning; +import twitter4j.Status; +import twitter4j.StatusDeletionNotice; +import twitter4j.StatusListener; +import twitter4j.TwitterStream; +import twitter4j.TwitterStreamFactory; +import twitter4j.conf.ConfigurationBuilder; + +/** Singleton class for twitter connection. * */ +class TwitterConnection { + private final BlockingQueue queue; + private final TwitterStream twitterStream; + private static final Object lock = new Object(); + static final ConcurrentHashMap INSTANCE_MAP = + new ConcurrentHashMap<>(); + + /** + * Creates a new Twitter connection. + * + * @param twitterConfig configuration for twitter connection + */ + TwitterConnection(TwitterConfig twitterConfig) { + this.queue = new LinkedBlockingQueue<>(); + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb.setDebugEnabled(true) + .setOAuthConsumerKey(twitterConfig.getKey()) + .setOAuthConsumerSecret(twitterConfig.getSecret()) + .setOAuthAccessToken(twitterConfig.getToken()) + .setOAuthAccessTokenSecret(twitterConfig.getTokenSecret()); + + this.twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); + StatusListener listener = + new StatusListener() { + @Override + public void onException(Exception e) { + e.printStackTrace(); + } + + @Override + public void onDeletionNotice(StatusDeletionNotice arg) {} + + @Override + public void onScrubGeo(long userId, long upToStatusId) {} + + @Override + public void onStallWarning(StallWarning warning) {} + + @Override + public void onStatus(Status status) { + try { + queue.offer(status); + } catch (Exception ignored) { + } + } + + @Override + public void onTrackLimitationNotice(int numberOfLimitedStatuses) {} + }; + FilterQuery tweetFilterQuery = new FilterQuery(); + for (String filter : twitterConfig.getFilters()) { + tweetFilterQuery.track(filter); + } + tweetFilterQuery.language(twitterConfig.getLanguage()); + this.twitterStream.addListener(listener); + this.twitterStream.filter(tweetFilterQuery); + } + + public static TwitterConnection getInstance(TwitterConfig twitterConfig) { + synchronized (lock) { + if (INSTANCE_MAP.containsKey(twitterConfig)) { + return INSTANCE_MAP.get(twitterConfig); + } + TwitterConnection singleInstance = new TwitterConnection(twitterConfig); + INSTANCE_MAP.put(twitterConfig, singleInstance); + return singleInstance; + } + } + + public BlockingQueue getQueue() { + return this.queue; + } + + public void closeStream() { + this.twitterStream.shutdown(); + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterIO.java new file mode 100644 index 000000000000..f194859bbe8f --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterIO.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.twitterstreamgenerator; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; + +/** + * An unbounded source for twitter + * stream. PTransforms for streaming live tweets from twitter. Reading from Twitter is supported by + * read() + * + *

Standard Twitter API can be read using a list of Twitter Config + * readStandardStream(List) + * + *

It allow multiple Twitter configurations to demonstrate how multiple twitter streams can be + * combined in a single pipeline. + * + *

{@code
+ * PCollection weatherData = pipeline.apply(
+ *      TwitterIO.readStandardStream(
+ *          Arrays.asList(
+ *                  new TwitterConfig.Builder()
+ *                      .setKey("")
+ *                      .setSecret("")
+ *                      .setToken("")
+ *                      .setTokenSecret("")
+ *                      .setFilters(Arrays.asList("", ""))
+ *                      .setLanguage("en")
+ *                      .setTweetsCount(10L)
+ *                      .setMinutesToRun(1)
+ *                      .build())));
+ * }
+ */ +public class TwitterIO { + + /** + * Initializes the stream by converting input to a Twitter connection configuration. + * + * @param twitterConfigs list of twitter config + * @return PTransform of statuses + */ + public static PTransform> readStandardStream( + List twitterConfigs) { + return new TwitterIO.Read.Builder().setTwitterConfig(twitterConfigs).build(); + } + + /** A {@link PTransform} to read from Twitter stream. usage and configuration. */ + private static class Read extends PTransform> { + private final List twitterConfigs; + + private Read(Builder builder) { + this.twitterConfigs = builder.twitterConfigs; + } + + @Override + public PCollection expand(PBegin input) throws IllegalArgumentException { + return input.apply(Create.of(this.twitterConfigs)).apply(ParDo.of(new ReadFromTwitterDoFn())); + } + + private static class Builder { + List twitterConfigs = new ArrayList<>(); + + TwitterIO.Read.Builder setTwitterConfig(final List twitterConfigs) { + this.twitterConfigs = twitterConfigs; + return this; + } + + TwitterIO.Read build() { + return new TwitterIO.Read(this); + } + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterStream.java b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterStream.java new file mode 100644 index 000000000000..d1f8b2566c2b --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterStream.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.twitterstreamgenerator; + +import java.util.Arrays; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link TwitterStream} pipeline is a streaming pipeline which ingests data in JSON format from + * Twitter, and outputs the resulting records to console. Stream configurations are specified by the + * user as template parameters.
+ * + *

Concepts: API connectors and streaming; splittable Dofn and watermarking ; logging + * + *

To execute this pipeline locally, specify key, secret, token, token-secret and filters to + * filter stream with, for your twitter streaming app.You can also set number of tweets ( use set + * TweetsCount - default Long.MAX_VALUE ) you wish to stream and/or the number of minutes to run the + * pipeline ( use set MinutesToRun: default Integer.MAX_VALUE ) : + * + *

{@code
+ * new TwitterConfig
+ *        .Builder()
+ *        .setKey("")
+ *        .setSecret("")
+ *        .setToken("")
+ *        .setTokenSecret("")
+ *        .setFilters(Arrays.asList("", "")).build()
+ * }
+ * + *

To change the runner( does not works on Dataflow ), specify: + * + *

{@code
+ * --runner=YOUR_SELECTED_RUNNER
+ * }
+ * + * See examples/java/README.md for instructions about how to configure different runners. + */ +public class TwitterStream { + + private static final Logger LOG = LoggerFactory.getLogger(TwitterStream.class); + + /** + * Main entry point for pipeline execution. + * + * @param args Command line arguments to the pipeline. + */ + public static void main(String[] args) { + Pipeline pipeline = Pipeline.create(); + Window.configure() + .triggering( + Repeatedly.forever( + AfterFirst.of( + AfterPane.elementCountAtLeast(10), + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(2))))); + PCollection tweetStream = + pipeline + .apply( + "Create Twitter Connection Configuration", + TwitterIO.readStandardStream( + Arrays.asList( + new TwitterConfig.Builder() + .setKey("") + .setSecret("") + .setToken("") + .setTokenSecret("") + .setFilters(Arrays.asList("", "")) + .setLanguage("en") + .setTweetsCount(10L) + .setMinutesToRun(1) + .build()))) + .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))); + tweetStream.apply( + "Output Tweets to console", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, OutputReceiver receiver) { + LOG.debug("Output tweets: " + element); + receiver.output(element); + } + })); + + pipeline.run(); + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/package-info.java new file mode 100644 index 000000000000..87cdf672b868 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Kafka to Pubsub template. */ +package org.apache.beam.examples.complete.twitterstreamgenerator; diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFnTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFnTest.java new file mode 100644 index 000000000000..23e768e2f603 --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFnTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.twitterstreamgenerator; + +import static org.junit.Assert.assertArrayEquals; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.IntStream; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import twitter4j.Status; + +@RunWith(JUnit4.class) +public class ReadFromTwitterDoFnTest { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public final ExpectedException expectedException = ExpectedException.none(); + @Mock TwitterConnection twitterConnection1; + @Mock TwitterConnection twitterConnection2; + @Mock Status status1; + @Mock Status status2; + @Mock Status status3; + @Mock Status status4; + @Mock Status status5; + LinkedBlockingQueue queue1 = new LinkedBlockingQueue<>(); + LinkedBlockingQueue queue2 = new LinkedBlockingQueue<>(); + + @Before + public void setUp() throws JsonProcessingException { + MockitoAnnotations.initMocks(this); + when(status1.getText()).thenReturn("Breaking News1"); + when(status1.getCreatedAt()).thenReturn(new Date()); + when(status2.getText()).thenReturn("Breaking News2"); + when(status2.getCreatedAt()).thenReturn(new Date()); + when(status3.getText()).thenReturn("Breaking News3"); + when(status3.getCreatedAt()).thenReturn(new Date()); + when(status4.getText()).thenReturn("Breaking News4"); + when(status4.getCreatedAt()).thenReturn(new Date()); + when(status5.getText()).thenReturn("Breaking News5"); + when(status5.getCreatedAt()).thenReturn(new Date()); + queue1.offer(status1); + queue1.offer(status2); + queue1.offer(status3); + queue2.offer(status4); + queue2.offer(status5); + } + + @Test + public void testTwitterRead() { + TwitterConfig twitterConfig = new TwitterConfig.Builder().setTweetsCount(3L).build(); + TwitterConnection.INSTANCE_MAP.put(twitterConfig, twitterConnection1); + when(twitterConnection1.getQueue()).thenReturn(queue1); + PCollection result = + pipeline + .apply("Create Twitter Connection Configuration", Create.of(twitterConfig)) + .apply(ParDo.of(new ReadFromTwitterDoFn())); + PAssert.that(result) + .satisfies( + pcollection -> { + List output = new ArrayList<>(); + pcollection.forEach(output::add); + String[] expected = {"Breaking News1", "Breaking News2", "Breaking News3"}; + String[] actual = new String[output.size()]; + IntStream.range(0, output.size()).forEach((i) -> actual[i] = output.get(i)); + assertArrayEquals("Mismatch found in output", actual, expected); + return null; + }); + pipeline.run(); + } + + @Test + public void testMultipleTwitterConfigs() { + TwitterConfig twitterConfig1 = new TwitterConfig.Builder().setTweetsCount(3L).build(); + TwitterConfig twitterConfig2 = new TwitterConfig.Builder().setTweetsCount(2L).build(); + TwitterConnection.INSTANCE_MAP.put(twitterConfig1, twitterConnection1); + TwitterConnection.INSTANCE_MAP.put(twitterConfig2, twitterConnection2); + when(twitterConnection1.getQueue()).thenReturn(queue1); + when(twitterConnection2.getQueue()).thenReturn(queue2); + PCollection result = + pipeline + .apply( + "Create Twitter Connection Configuration", + Create.of(twitterConfig1, twitterConfig2)) + .apply(ParDo.of(new ReadFromTwitterDoFn())); + PAssert.that(result) + .satisfies( + pcollection -> { + List output = new ArrayList<>(); + pcollection.forEach(output::add); + String[] expected = { + "Breaking News1", + "Breaking News2", + "Breaking News3", + "Breaking News4", + "Breaking News5" + }; + String[] actual = new String[output.size()]; + Collections.sort(output); + IntStream.range(0, output.size()).forEach((i) -> actual[i] = output.get(i)); + assertArrayEquals("Mismatch found in output", actual, expected); + return null; + }); + pipeline.run(); + } +}