From b3b0ff118cac3c0a5a10f9912b383bb0665c9a1b Mon Sep 17 00:00:00 2001 From: Chris Fregly Date: Wed, 16 Jul 2014 00:03:04 -0700 Subject: [PATCH 1/9] [SPARK-1981] Add AWS Kinesis streaming support --- assembly/pom.xml | 10 + bin/run-kinesis-example | 60 +++ bin/run-kinesis-example.cmd | 90 +++++ .../src/main/scala/SparkApp.scala | 7 + dev/audit-release/sbt_app_kinesis/build.sbt | 30 ++ .../src/main/scala/SparkApp.scala | 33 ++ dev/create-release/create-release.sh | 2 + docs/streaming-custom-receivers.md | 4 +- docs/streaming-programming-guide.md | 65 +++- extras/spark-kinesis-asl/pom.xml | 90 +++++ .../streaming/JavaKinesisWordCount.java | 310 ++++++++++++++++ .../src/main/resources/log4j.properties | 42 +++ .../examples/streaming/KinesisWordCount.scala | 345 ++++++++++++++++++ .../streaming/kinesis/CheckpointState.scala | 52 +++ .../streaming/kinesis/KinesisReceiver.scala | 122 +++++++ .../kinesis/KinesisRecordProcessor.scala | 148 ++++++++ .../kinesis/KinesisRecordSerializer.scala | 54 +++ .../KinesisStringRecordSerializer.scala | 47 +++ .../streaming/kinesis/KinesisUtils.scala | 151 ++++++++ .../src/test/resources/log4j.properties | 42 +++ .../kinesis/KinesisReceiverSuite.scala | 267 ++++++++++++++ pom.xml | 8 + project/SparkBuild.scala | 21 +- 23 files changed, 1992 insertions(+), 8 deletions(-) create mode 100755 bin/run-kinesis-example create mode 100755 bin/run-kinesis-example.cmd create mode 100644 dev/audit-release/sbt_app_kinesis/build.sbt create mode 100644 dev/audit-release/sbt_app_kinesis/src/main/scala/SparkApp.scala create mode 100644 extras/spark-kinesis-asl/pom.xml create mode 100644 extras/spark-kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java create mode 100644 extras/spark-kinesis-asl/src/main/resources/log4j.properties create mode 100644 extras/spark-kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala create mode 100644 extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/CheckpointState.scala create mode 100644 extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala create mode 100644 extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala create mode 100644 extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordSerializer.scala create mode 100644 extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala create mode 100644 extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala create mode 100644 extras/spark-kinesis-asl/src/test/resources/log4j.properties create mode 100644 extras/spark-kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala diff --git a/assembly/pom.xml b/assembly/pom.xml index 0c60b66c3daca..60cc5aef67098 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -173,6 +173,16 @@ + + spark-kinesis-asl + + + org.apache.spark + spark-kinesis-asl_${scala.binary.version} + ${project.version} + + + bigtop-dist + + 4.0.0 + + org.apache.spark + spark-parent + 1.1.0-SNAPSHOT + ../../pom.xml + + + + org.apache.spark + spark-kinesis-asl_2.10 + jar + Spark Kinesis Integration + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + + + com.amazonaws + amazon-kinesis-client + 1.1.0 + + + com.amazonaws + aws-java-sdk + 1.8.3 + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.mockito + mockito-all + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + org.easymock + easymockclassextension + test + + + com.novocode + junit-interface + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + + diff --git a/extras/spark-kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java b/extras/spark-kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java new file mode 100644 index 0000000000000..6f3a2454907ec --- /dev/null +++ b/extras/spark-kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.streaming; + +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Milliseconds; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.dstream.DStream; +import org.apache.spark.streaming.kinesis.KinesisRecordSerializer; +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer; +import org.apache.spark.streaming.kinesis.KinesisUtils; + +import scala.Tuple2; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +/** + * Java-friendly Kinesis Spark Streaming WordCount example + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the given stream. + * It then starts pulling from the tip of the given and at the given . + * Because we're pulling from the tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts. + * This could lead to missed records if data is added to the stream while no KinesisReceivers are running. + * In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data + * depending on the checkpoint frequency. + * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency. + * Record processing should be idempotent when possible. + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: JavaKinesisWordCount + * is the name of the Kinesis stream (ie. mySparkStream) + * is the endpoint of the Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * is the batch interval in milliseconds (ie. 1000ms) + * + * Example: + * $ export AWS_ACCESS_KEY_ID= + * $ export AWS_SECRET_KEY= + * $ bin/run-kinesis-example \ + * org.apache.spark.examples.streaming.JavaKinesisWordCount mySparkStream https://kinesis.us-east-1.amazonaws.com 1000 + * + * There is a companion helper class called KinesisWordCountProducer which puts dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in the class definition. + */ +public final class JavaKinesisWordCount { + private static final Pattern WORD_SEPARATOR = Pattern.compile(" "); + private static final Logger logger = Logger.getLogger(JavaKinesisWordCount.class); + + /** + * Make the constructor private to enforce singleton + */ + private JavaKinesisWordCount() { + } + + public static void main(String[] args) { + /** + * Check that all required args were passed in. + */ + if (args.length < 3) { + System.err.println("Usage: JavaKinesisWordCount "); + System.exit(1); + } + + /** + * (This was lifted from the StreamingExamples.scala in order to avoid the dependency on the spark-examples artifact.) + * Set reasonable logging levels for streaming if the user has not configured log4j. + */ + boolean log4jInitialized = Logger.getRootLogger().getAllAppenders() + .hasMoreElements(); + if (!log4jInitialized) { + /** We first log something to initialize Spark's default logging, then we override the logging level. */ + Logger.getRootLogger() + .info("Setting log level to [ERROR] for streaming example." + + " To override add a custom log4j.properties to the classpath."); + Logger.getRootLogger().setLevel(Level.ERROR); + Logger.getLogger("org.apache.spark.examples.streaming").setLevel(Level.DEBUG); + } + + /** Populate the appropriate variables from the given args */ + String stream = args[0]; + String endpoint = args[1]; + Integer batchIntervalMillis = Integer.valueOf(args[2]); + + /** Create a Kinesis client in order to determine the number of shards for the given stream */ + AmazonKinesisClient KinesisClient = new AmazonKinesisClient( + new DefaultAWSCredentialsProviderChain()); + + /** Determine the number of shards from the stream */ + int numShards = KinesisClient.describeStream(stream) + .getStreamDescription().getShards().size(); + + /** In this example, we're going to create 1 Kinesis Worker/Receiver/DStreams for each stream shard */ + int numStreams = numShards; + + /** Must add 1 more thread than the number of receivers or the output won't show properly from the driver */ + int numSparkThreads = numStreams + 1; + + /** Set the app name */ + String app = "KinesisWordCount"; + + /** Setup the Spark config. */ + SparkConf sparkConfig = new SparkConf().setAppName(app).setMaster( + "local[" + numSparkThreads + "]"); + + /** + * Set the batch interval. + * Records will be pulled from the Kinesis stream and stored as a single DStream within Spark every batch interval. + */ + Duration batchInterval = Milliseconds.apply(batchIntervalMillis); + + /** + * It's recommended that you perform a Spark checkpoint between 5 and 10 times the batch interval. + * While this is the Spark checkpoint interval, we're going to use it for the Kinesis checkpoint interval, as well. + */ + Duration checkpointInterval = batchInterval.$times(5); + + /** Setup the StreamingContext */ + JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval); + + /** Setup the checkpoint directory used by Spark Streaming */ + jssc.checkpoint("/tmp/checkpoint"); + + /** Create the same number of Kinesis Receivers/DStreams as stream shards, then union them all */ + JavaDStream allStreams = KinesisUtils + .createJavaStream(jssc, app, stream, endpoint, checkpointInterval.milliseconds(), + InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()); + /** Set the checkpoint interval */ + allStreams.checkpoint(checkpointInterval); + for (int i = 1; i < numStreams; i++) { + /** Create a new Receiver/DStream for each stream shard */ + JavaDStream dStream = KinesisUtils + .createJavaStream(jssc, app, stream, endpoint, checkpointInterval.milliseconds(), + InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()); + /** Set the Spark checkpoint interval */ + dStream.checkpoint(checkpointInterval); + + /** Union with the existing streams */ + allStreams = allStreams.union(dStream); + } + + /** This implementation uses the String-based KinesisRecordSerializer impl */ + final KinesisRecordSerializer recordSerializer = new KinesisStringRecordSerializer(); + + /** + * Split each line of the union'd DStreams into multiple words using flatMap to produce the collection. + * Convert lines of byte[] to multiple Strings by first converting to String, then splitting on WORD_SEPARATOR + * We're caching the result here so that we can use it later without having to re-materialize the underlying RDDs. + */ + JavaDStream words = allStreams + .flatMap(new FlatMapFunction() { + /** + * Convert lines of byte[] to multiple words split by WORD_SEPARATOR + * @param byte array + * @return iterable of words split by WORD_SEPARATOR + */ + @Override + public Iterable call(byte[] line) { + return Lists.newArrayList(WORD_SEPARATOR.split(recordSerializer.deserialize(line))); + } + }).cache(); + + /** windowInterval must be a multiple of the batchInterval */ + Duration windowInterval = batchInterval.$times(5); + + /** slideInterval must be a multiple of the batchInterval */ + Duration slideInterval = batchInterval.$times(1); + + /** + * Map each word to a (word, 1) tuple so we can reduce/aggregate later. + * We're caching the result here so that we can use it later without having + * to re-materialize the underlying RDDs. + */ + JavaPairDStream wordCounts = words.mapToPair( + new PairFunction() { + /** + * Create the (word, 1) tuple + * @param word + * @return (word, 1) tuple + */ + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }); + + /** + * Reduce/aggregate by key + * We're caching the result here so that we can use it later without having + * to re-materialize the underlying RDDs. + */ + JavaPairDStream wordCountsByKey = wordCounts.reduceByKey( + new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }).cache(); + + /** + * Reduce/aggregate by key for the given window. + * We're using the inverse-function (left - right) optimization over the sliding window per the Window Operations described at the following url: + * http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations + */ + JavaPairDStream wordCountsByKeyAndWindow = wordCountsByKey.reduceByKeyAndWindow( + new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }, windowInterval, slideInterval); + + /** + * Sort and print the word counts by key and window. + * This is an Output Operation and will materialize the DStream. + */ + sortAndPrint("Word Counts By Key and Window", wordCountsByKeyAndWindow); + + Function2, Optional, Optional> updateTotals = + new Function2, Optional, Optional>() { + @Override public Optional call(List newCounts, Optional currentCount) { + Integer currentSum = 0; + if (currentCount.isPresent()) { + currentSum = currentCount.get(); + } + Integer newSum = currentSum; + + for (Integer newCount : newCounts) { + newSum += newCount; + } + return Optional.of(newSum); + } + }; + + /** + * Calculate the running totals using the updateTotals method. + */ + JavaPairDStream wordTotalsByKey = wordCountsByKey.updateStateByKey(updateTotals); + + /** + * Sort and print the running word totals. + * This is an Output Operation and will materialize the DStream. + */ + sortAndPrint("Word Count Totals By Key", wordTotalsByKey); + + /** Start the streaming context and await termination */ + jssc.start(); + jssc.awaitTermination(); + } + + /** + * Sort and print the given dstream. + * This is an Output Operation that will materialize the underlying DStream. + * Everything up to this point is a lazy Transformation Operation. + * + * @param description of the dstream for logging purposes + * @param dstream to sort and print + */ + private static void sortAndPrint(final String description, JavaPairDStream dstream) { + dstream.foreachRDD( + new Function, Void>() { + public Void call(JavaPairRDD batch) { + JavaPairRDD sortedBatch = batch.sortByKey(true); + logger.info(description); + for (Object wordCount: sortedBatch.collect()) { + logger.info(wordCount); + } + + return null; + } + }); + } +} diff --git a/extras/spark-kinesis-asl/src/main/resources/log4j.properties b/extras/spark-kinesis-asl/src/main/resources/log4j.properties new file mode 100644 index 0000000000000..ad789341e62c9 --- /dev/null +++ b/extras/spark-kinesis-asl/src/main/resources/log4j.properties @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=WARN, console + +# File appender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Settings to quiet third party logs that are too verbose +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO + +# Log all Kinesis Streaming messages +log4j.logger.org.apache.spark.examples.streaming=DEBUG +log4j.logger.org.apache.spark.streaming.kinesis=DEBUG diff --git a/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala new file mode 100644 index 0000000000000..0a0cccb49433d --- /dev/null +++ b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.model.PutRecordRequest +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.dstream.DStream + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the given stream. + * It then starts pulling from the tip of the given and at the given . + * Because we're pulling from the tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts. + * This could lead to missed records if data is added to the stream while no KinesisReceivers are running. + * In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data + * depending on the checkpoint frequency. + * + * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency. + * Record processing should be idempotent when possible. + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount + * is the name of the Kinesis stream (ie. mySparkStream) + * is the endpoint of the Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * is the batch interval in millis (ie. 1000ms) + * + * Example: + * $ export AWS_ACCESS_KEY_ID= + * $ export AWS_SECRET_KEY= + * $ bin/run-kinesis-example \ + * org.apache.spark.examples.streaming.KinesisWordCount mySparkStream https://kinesis.us-east-1.amazonaws.com 100 + * + * There is a companion helper class below called KinesisWordCountProducer which puts dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = " " + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ + if (args.length < 3) { + System.err.println("Usage: KinesisWordCount ") + System.exit(1) + } + + /** + * (This was lifted from the StreamingExamples.scala in order to avoid the dependency on the spark-examples artifact.) + * Set reasonable logging levels for streaming if the user has not configured log4j. + */ + val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements + if (!log4jInitialized) { + /** We first log something to initialize Spark's default logging, then we override the logging level. */ + logInfo("Setting log level to [INFO] for streaming example." + + " To override add a custom log4j.properties to the classpath.") + + Logger.getRootLogger().setLevel(Level.INFO) + Logger.getLogger("org.apache.spark.examples.streaming").setLevel(Level.DEBUG); + } + + /** Populate the appropriate variables from the given args */ + val Array(stream, endpoint, batchIntervalMillisStr) = args + val batchIntervalMillis = batchIntervalMillisStr.toInt + + /** Create a Kinesis client in order to determine the number of shards for the given stream */ + val KinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()); + + /** Determine the number of shards from the stream */ + val numShards = KinesisClient.describeStream(stream).getStreamDescription().getShards().size() + + /** In this example, we're going to create 1 Kinesis Worker/Receiver/DStreams for each stream shard */ + val numStreams = numShards + + /** Must add 1 more thread than the number of receivers or the output won't show properly from the driver */ + val numSparkThreads = numStreams + 1 + + /** Set the app name */ + val app = "KinesisWordCount" + + /** Setup the Spark config. */ + val sparkConfig = new SparkConf().setAppName(app).setMaster(s"local[$numSparkThreads]") + + /** + * Set the batch interval. + * Records will be pulled from the Kinesis stream and stored as a single DStream within Spark every batch interval. + */ + val batchInterval = Milliseconds(batchIntervalMillis) + + /** + * It's recommended that you perform a Spark checkpoint between 5 and 10 times the batch interval. + * While this is the Spark checkpoint interval, we're going to use it for the Kinesis checkpoint interval, as well. + */ + val checkpointInterval = batchInterval * 5 + + /** Setup the StreamingContext */ + val ssc = new StreamingContext(sparkConfig, batchInterval) + + /** Setup the checkpoint directory used by Spark Streaming */ + ssc.checkpoint("/tmp/checkpoint"); + + /** Create the same number of Kinesis Receivers/DStreams as stream shards, then union them all */ + var allStreams: DStream[Array[Byte]] = KinesisUtils.createStream(ssc, app, stream, endpoint, checkpointInterval.milliseconds, + InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) + /** Set the checkpoint interval */ + allStreams.checkpoint(checkpointInterval) + for (i <- 1 until numStreams) { + /** Create a new Receiver/DStream for each stream shard */ + val dStream = KinesisUtils.createStream(ssc, app, stream, endpoint, checkpointInterval.milliseconds, + InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) + /** Set the Spark checkpoint interval */ + dStream.checkpoint(checkpointInterval) + + /** Union with the existing streams */ + allStreams = allStreams.union(dStream) + } + + /** This implementation uses the String-based KinesisRecordSerializer impl */ + val recordSerializer = new KinesisStringRecordSerializer() + + /** + * Sort and print the given dstream. + * This is an Output Operation that will materialize the underlying DStream. + * Everything up to this point is a lazy Transformation Operation. + * + * @param description of the dstream for logging purposes + * @param dstream to sort and print + */ + def sortAndPrint(description: String, dstream: DStream[(String,Int)]) = { + dstream.foreachRDD((batch, endOfWindowTime) => { + val sortedBatch = batch.sortByKey(true) + logInfo(s"$description @ $endOfWindowTime") + sortedBatch.collect().foreach( + wordCount => logInfo(s"$wordCount")) + }) + } + + /** + * Split each line of the union'd DStreams into multiple words using flatMap to produce the collection. + * Convert lines of Array[Byte] to multiple Strings by first converting to String, then splitting on WORD_SEPARATOR + * We're caching the result here so that we can use it later without having to re-materialize the underlying RDDs. + */ + val words = allStreams.flatMap(line => recordSerializer.deserialize(line).split(WordSeparator)).cache() + + /** windowInterval must be a multiple of the batchInterval */ + val windowInterval = batchInterval * 5 + + /** slideInterval must be a multiple of the batchInterval */ + val slideInterval = batchInterval * 1 + + /** + * Map each word to a (word, 1) tuple so we can reduce/aggregate later. + * We're caching the result here so that we can use it later without having + * to re-materialize the underlying RDDs. + */ + val wordCounts = words.map(word => (word, 1)) + + /** + * Reduce/aggregate by key. + * We're caching the result here so that we can use it later without having + * to re-materialize the underlying RDDs. + */ + val wordCountsByKey = wordCounts.reduceByKey((left, right) => left + right) + + /** + * Reduce/aggregate by key for the given window. + * We're using the inverse-function (left - right) optimization over the sliding window per the Window Operations described at the following url: + * http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations + */ + val wordCountsByKeyAndWindow = wordCountsByKey.reduceByKeyAndWindow((left, right) => left + right, (left, right) => left - right, windowInterval, slideInterval) + + /** + * Sort and print the word counts by key and window. + * This is an Output Operation and will materialize the DStream. + * + */ + sortAndPrint("Word Counts By Key and Window", wordCountsByKeyAndWindow) + + /** + * Update the running totals of words. + * + * @param sequence of new counts + * @param current running total (could be None if no current count exists) + */ + def updateTotals = (newCounts: Seq[Int], currentCounts: Option[Int]) => { + val newCount = newCounts.foldLeft(0)((left, right) => left + right) + val currentCount = currentCounts.getOrElse(0) + Some(newCount + currentCount) + } + + /** + * Calculate the running totals using the updateTotals method. + */ + val wordTotalsByKey = wordCountsByKey.updateStateByKey[Int](updateTotals) + + /** + * Sort and print the running word totals. + * This is an Output Operation and will materialize the DStream. + */ + sortAndPrint("Word Count Totals By Key", wordTotalsByKey) + + /** Start the streaming context and await termination */ + ssc.start() + ssc.awaitTermination() + } +} + +/** + * Usage: KinesisWordCountProducer + * is the name of the Kinesis stream (ie. mySparkStream) + * is the endpoint of the Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * is the rate of records per second to put onto the stream + * is the rate of records per second to put onto the stream + * + * Example: + * $ export AWS_ACCESS_KEY_ID= + * $ export AWS_SECRET_KEY= + * $ bin/run-kinesis-example \ + * org.apache.spark.examples.streaming.KinesisWordCountProducer mySparkStream https://kinesis.us-east-1.amazonaws.com 10 5 + */ +private[streaming] +object KinesisWordCountProducer extends Logging { + val MaxRandomInts = 10 + + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: KinesisWordCountProducer ") + System.exit(1) + } + + /** + * (This was lifted from the StreamingExamples.scala in order to avoid the dependency on the spark-examples artifact.) + * Set reasonable logging levels for streaming if the user has not configured log4j. + */ + val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements + if (!log4jInitialized) { + /** We first log something to initialize Spark's default logging, then we override the logging level. */ + logInfo("Setting log level to [INFO] for streaming example." + + " To override add a custom log4j.properties to the classpath.") + + Logger.getRootLogger().setLevel(Level.INFO) + Logger.getLogger("org.apache.spark.examples.streaming").setLevel(Level.DEBUG); + } + + /** Populate the appropriate variables from the given args */ + val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args + + /** Generate the records and return the totals */ + val totals: Seq[(Int, Int)] = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt) + + logInfo("Totals") + /** Print the array of (index, total) tuples */ + totals.foreach(total => logInfo(total.toString())) + } + + def generate(stream: String, endpoint: String, recordsPerSecond: Int, wordsPerRecord: Int): Seq[(Int, Int)] = { + val WORD_SEPARATOR = " " + + /** Create the Kinesis client */ + val KinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) + + logInfo(s"Putting records onto stream $stream and endpoint $endpoint at a rate of $recordsPerSecond records per second and $wordsPerRecord words per record"); + + /** Create the String-based record serializer */ + val recordSerializer = new KinesisStringRecordSerializer() + + val totals = new Array[Int](MaxRandomInts) + /** Put String records onto the stream per the given recordPerSec and wordsPerRecord */ + for (i <- 1 to 5) { + /** Generate recordsPerSec records to put onto the stream */ + val records = (1 to recordsPerSecond.toInt).map { recordNum => + /** Randomly generate each wordsPerRec words between 0 (inclusive) and MAX_RANDOM_INTS (exclusive) */ + val data = (1 to wordsPerRecord.toInt).map(x => { + /** Generate the random int */ + val randomInt = Random.nextInt(MaxRandomInts) + + /** Keep track of the totals */ + totals(randomInt) += 1 + + /** Convert the Int to a String */ + randomInt.toString() + }) + /** Create a String of randomInts separated by WORD_SEPARATOR */ + .mkString(WORD_SEPARATOR) + + /** Create a partitionKey based on recordNum */ + val partitionKey = s"partitionKey-$recordNum" + + /** Create a PutRecordRequest with an Array[Byte] version of the data */ + val putRecordRequest = new PutRecordRequest().withStreamName(stream).withPartitionKey(partitionKey) + .withData(ByteBuffer.wrap(recordSerializer.serialize(data))); + + /** Put the record onto the stream and capture the PutRecordResult */ + val putRecordResult = KinesisClient.putRecord(putRecordRequest); + + logInfo(s"Successfully put record with partitionKey $partitionKey and shardId ${putRecordResult.getShardId()} and data $data") + } + + /** Sleep for a second */ + Thread.sleep(1000) + } + + /** Convert the totals to (index, total) tuple */ + (0 to (MaxRandomInts - 1)).zip(totals) + } +} diff --git a/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/CheckpointState.scala b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/CheckpointState.scala new file mode 100644 index 0000000000000..a28d022cb61c8 --- /dev/null +++ b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/CheckpointState.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.SystemClock + +/** + * This is a helper class for managing checkpoint clocks. + * + * @param checkpoint interval in millis + * @param current clock. if none specified, will default to current SystemClock + */ +class CheckpointState(checkpointIntervalMillis: Long, currentClock: Clock = new SystemClock()) extends Logging { + /** + * Initialize the checkpoint clock using the given currentClock + checkpointIntervalMillis + */ + val checkpointClock = new ManualClock() + checkpointClock.setTime(currentClock.currentTime() + checkpointIntervalMillis) + + /** + * Check if it's time to checkpoint based on the current time and the derived time for the next checkpoint + * + * @return true if it's time to checkpoint + */ + def shouldCheckpoint(): Boolean = { + new SystemClock().currentTime() > checkpointClock.currentTime() + } + + /** + * Advance the checkpoint clock by the checkpoint interval. + */ + def advanceCheckpoint() = { + checkpointClock.addToTime(checkpointIntervalMillis) + } +} diff --git a/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala new file mode 100644 index 0000000000000..98eed6eb196d9 --- /dev/null +++ b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import java.nio.ByteBuffer +import org.apache.spark.streaming.util.SystemClock + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers to run within a Spark Executor. + * + * @param app name + * @param Kinesis stream name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * @param persistence strategy for RDDs and DStreams. + */ +private[streaming] class KinesisReceiver( + app: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel) + extends Receiver[Array[Byte]](storageLevel) with Logging { receiver => + + /** + * The lazy val's below will get instantiated in the remote Executor after the closure is shipped to the Spark Worker. + * These are all lazy because they're from third-party Amazon libraries and are not Serializable. + * If they're not marked lazy, they will cause NotSerializableExceptions when they're shipped to the Spark Worker. + */ + + /** + * workerId is lazy because we want the address of the actual Worker where the code runs - not the Driver's ip address. + * This makes a difference when running in a cluster. + */ + lazy val workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID() + + /** + * This impl uses the DefaultAWSCredentialsProviderChain per the following url: + * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html + * and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI + * Instance profile credentials delivered through the Amazon EC2 metadata service + */ + lazy val credentialsProvider = new DefaultAWSCredentialsProviderChain() + + /** Create a KCL config instance. */ + lazy val KinesisClientLibConfiguration = new KinesisClientLibConfiguration(app, stream, credentialsProvider, workerId) + .withKinesisEndpoint(endpoint).withInitialPositionInStream(initialPositionInStream).withTaskBackoffTimeMillis(500) + + /** + * RecordProcessorFactory creates impls of IRecordProcessor. + * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the IRecordProcessor.processRecords() method. + * We're using our custom KinesisRecordProcessor in this case. + */ + lazy val recordProcessorFactory: IRecordProcessorFactory = new IRecordProcessorFactory { + override def createProcessor: IRecordProcessor = new KinesisRecordProcessor(receiver, workerId, KinesisUtils.createCheckpointState(checkpointIntervalMillis)) + } + + /** + * Create a Kinesis Worker. + * This is the core client abstraction from the Kinesis Client Library (KCL). + * We pass the RecordProcessorFactory from above as well as the KCL config instance. + * A Kinesis Worker can process 1..* shards from the given stream - each with its own RecordProcessor. + */ + lazy val worker: Worker = new Worker(recordProcessorFactory, KinesisClientLibConfiguration); + + /** + * This is called when the KinesisReceiver starts and must be non-blocking. + * The KCL creates and manages the receiving/processing thread pool through the Worker.run() method. + */ + override def onStart() { + logInfo(s"Starting receiver with workerId $workerId") + worker.run() + } + + /** + * This is called when the KinesisReceiver stops. + * The KCL worker.shutdown() method stops the receiving/processing threads. + * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown. + */ + override def onStop() { + logInfo(s"Shutting down receiver with workerId $workerId") + worker.shutdown() + } +} diff --git a/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala new file mode 100644 index 0000000000000..8dd24501fe381 --- /dev/null +++ b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.List +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.mutable.ArrayBuffer +import org.apache.spark.Logging +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.SystemClock +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason +import com.amazonaws.services.kinesis.model.Record +import scala.compat.Platform +import org.apache.spark.streaming.util.Clock + +/** + * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor. + * This implementation operates on the Array[Byte] from the KinesisReceiver. + * The Kinesis Worker creates an instance of this KinesisRecordProcessor upon startup. + * + * @param Kinesis receiver + * @param workerId for logging purposes + * @param checkpoint utils + * @param Kinesis checkpoint interval (millis) + */ +private[streaming] class KinesisRecordProcessor( + receiver: KinesisReceiver, + workerId: String, + checkpointState: CheckpointState) extends IRecordProcessor with Logging { + + /** shardId to be populated during initialize() */ + var shardId: String = _ + + /** + * The Kinesis Client Library calls this method during IRecordProcessor initialization. + * + * @param shardId assigned by the KCL to this particular RecordProcessor. + */ + override def initialize(shardId: String) { + logInfo(s"Initialize: Initializing workerId $workerId with shardId $shardId") + + this.shardId = shardId + } + + /** + * This method is called by the KCL when a batch of records is pulled from the Kinesis stream. + * This is the record-processing bridge between the KCL's IRecordProcessor.processRecords() + * and Spark Streaming's Receiver.store(). + * + * @param list of records from the Kinesis stream shard + * @param checkpointer used to update Kinesis when this batch has been processed/stored in the DStream + */ + override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) { + if (!receiver.isStopped()) { + try { + /** + * Convert the list of records to a list of Array[Byte] + * Note: If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming + * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the + * internally-configured Spark serializer (kryo, etc). + * This is not desirable, so we instead store a raw Array[Byte] and decouple + * ourselves from the internal serialization strategy. + */ + val batchByteArrays = new ArrayBuffer[Array[Byte]](batch.size()) + batchByteArrays ++= batch.map(record => record.getData().array()) + + /** Store the list of Array[Byte] in Spark */ + KinesisUtils.retry(receiver.store(batchByteArrays), 4, 500) + logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") + + /** + * Checkpoint the sequence number of the last record successfully processed/stored in the batch. + * In this implementation, we're checkpointing after the given checkpointIntervalMillis. + */ + if (checkpointState.shouldCheckpoint()) { + /** Perform the checkpoint */ + KinesisUtils.retry(checkpointer.checkpoint(), 4, 500) + + /** Update the next checkpoint time */ + checkpointState.advanceCheckpoint() + + logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size} records for shardId $shardId") + logDebug(s"Checkpoint: Next checkpoint is at ${checkpointState.checkpointClock.currentTime()} for shardId $shardId") + } + } catch { + case e: Throwable => { + /** + * If there is a failure within the batch, the batch will not be checkpointed. + * This will potentially cause records since the last checkpoint to be processed more than once. + */ + logError(s"Exception: WorkerId $workerId encountered and exception while storing or checkpointing a batch for workerId $workerId and shardId $shardId.", e) + + /** Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */ + throw e + } + } + } else { + /** RecordProcessor has been stopped. */ + logInfo(s"Stopped: The Spark KinesisReceiver has stopped for workerId $workerId and shardId $shardId. No more records will be processed.") + } + } + + /** + * Kinesis Client Library is shutting down this Worker for 1 of 2 reasons: + * 1) the stream is resharding by splitting or merging adjacent shards (ShutdownReason.TERMINATE) + * 2) the failed or latent Worker has stopped sending heartbeats for whatever reason (ShutdownReason.ZOMBIE) + * + * @param checkpointer used to performn a Kinesis checkpoint for ShutdownReason.TERMINATE + * @param shutdown reason (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE) + */ + override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) { + logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason") + reason match { + /** + * TERMINATE Use Case. Checkpoint. + * Checkpoint to indicate that all records from the shard have been drained and processed. + * It's now OK to read from the new shards that resulted from a resharding event. + */ + case ShutdownReason.TERMINATE => KinesisUtils.retry(checkpointer.checkpoint(), 4, 500) + + /** + * ZOMBIE Use Case. NoOp. + * No checkpoint because other workers may have taken over and already started processing the same records. + * This may lead to records being processed more than once. + */ + case ShutdownReason.ZOMBIE => + + /** Unknown reason. NoOp */ + case _ => + } + } +} diff --git a/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordSerializer.scala b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordSerializer.scala new file mode 100644 index 0000000000000..172c9b14eebca --- /dev/null +++ b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordSerializer.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.auth.AWSCredentialsProvider +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import org.apache.spark.SparkConf +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.SparkContext +import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.Seconds +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import scala.reflect.ClassTag +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import java.nio.ByteBuffer + +/** + * Convert custom types to/from Array[Byte]. + * @tparam type to serialize/deserialize + */ +private[streaming] trait KinesisRecordSerializer[T] extends Serializable { + /** + * Convert type to Array[Byte] + * + * @param type to serialize + * @return byte array + */ + def serialize(t: T): Array[Byte] + + /** + * Convert Array[Byte] to type + * + * @param byte array + * @return deserialized type + */ + def deserialize(array: Array[Byte]): T +} diff --git a/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala new file mode 100644 index 0000000000000..4fd9c39b3c535 --- /dev/null +++ b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.nio.ByteBuffer +import java.nio.charset.Charset +import java.nio.CharBuffer +import org.apache.spark.Logging + +/** + * Implementation of KinesisRecordSerializer to convert Array[Byte] to/from String. + */ +class KinesisStringRecordSerializer extends KinesisRecordSerializer[String] with Logging { + /** + * Convert String to Array[Byte] + * + * @param string to serialize + * @return byte array + */ + def serialize(string: String): Array[Byte] = { + string.getBytes() + } + + /** + * Convert Array[Byte] to String + * + * @param byte array + * @return deserialized string + */ + def deserialize(array: Array[Byte]): String = { + new String(array) + } +} diff --git a/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala new file mode 100644 index 0000000000000..0c3a3cc0043a6 --- /dev/null +++ b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( + ssc: StreamingContext, + app: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): ReceiverInputDStream[Array[Byte]] = { + + ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, checkpointIntervalMillis, initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory and on-disk to 2 nodes total (primary and secondary) + * + * @return JavaReceiverInputDStream[Array[Byte]] + */ + def createJavaStream( + jssc: JavaStreamingContext, + app: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): JavaReceiverInputDStream[Array[Byte]] = { + + jssc.receiverStream(new KinesisReceiver(app, stream, endpoint, checkpointIntervalMillis, initialPositionInStream, storageLevel)) + } + + /** + * Create checkpoint state using the existing system clock + * @param checkpointIntervalMillis + */ + def createCheckpointState(checkpointIntervalMillis: Long): CheckpointState = { + new CheckpointState(checkpointIntervalMillis) + } + + /** + * Retry the given amount of times with a random backoff time (millis) less than the given maxBackOffMillis + * + * @param expression expression to evalute + * @param numRetriesLeft number of retries left + * @param maxBackOffMillis: max millis between retries + * + * @return Evaluation of the given expression + * @throws Unretryable exception, unexpected exception, + * or any exception that persists after numRetriesLeft reaches 0 + */ + @annotation.tailrec + def retry[T](expression: => T, numRetriesLeft: Int, maxBackOffMillis: Int): T = { + util.Try { expression } match { + /** If the function succeeded, evaluate to x. */ + case util.Success(x) => x + /** If the function failed, either retry or throw the exception */ + case util.Failure(e) => e match { + /** Retry: Throttling or other Retryable exception has occurred */ + case _: ThrottlingException | _: KinesisClientLibDependencyException if numRetriesLeft > 1 => { + val backOffMillis = Random.nextInt(maxBackOffMillis) + Thread.sleep(backOffMillis) + logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e) + retry(expression, numRetriesLeft - 1, maxBackOffMillis) + } + /** Throw: Shutdown has been requested by the Kinesis Client Library.*/ + case _: ShutdownException => { + logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e) + throw e + } + /** Throw: Non-retryable exception has occurred with the Kinesis Client Library */ + case _: InvalidStateException => { + logError(s"InvalidStateException: Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library. Table likely doesn't exist.", e) + throw e + } + /** Throw: Unexpected exception has occurred */ + case _ => { + logError(s"Unexpected, non-retryable exception.", e) + throw e + } + } + } + } +} diff --git a/extras/spark-kinesis-asl/src/test/resources/log4j.properties b/extras/spark-kinesis-asl/src/test/resources/log4j.properties new file mode 100644 index 0000000000000..f6bf583b740cd --- /dev/null +++ b/extras/spark-kinesis-asl/src/test/resources/log4j.properties @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=WARN, console + +# File appender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Settings to quiet third party logs that are too verbose +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO + +# Log all Kinesis Streaming messages +log4j.logger.org.apache.spark.examples.streaming=DEBUG +log4j.logger.org.apache.spark.streaming.Kinesis=DEBUG \ No newline at end of file diff --git a/extras/spark-kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/spark-kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala new file mode 100644 index 0000000000000..3d86a7a17fa12 --- /dev/null +++ b/extras/spark-kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.nio.ByteBuffer +import java.nio.CharBuffer +import java.nio.charset.Charset +import scala.collection.JavaConversions.seqAsJavaList +import org.scalatest.BeforeAndAfter +import org.scalatest.FunSuite +import org.scalatest.Matchers +import org.scalatest.PrivateMethodTester +import org.scalatest.mock.EasyMockSugar +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.model.Record +import scala.collection.mutable.ArrayBuffer +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.SystemClock +import org.apache.spark.streaming.util.Clock + +/** + * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor + */ +class KinesisReceiverSuite extends FunSuite with Matchers with BeforeAndAfter with EasyMockSugar { + val app = "TestKinesisReceiver" + val stream = "mySparkStream" + val endpoint = "endpoint-url" + val workerId = "dummyWorkerId" + val shardId = "dummyShardId" + + val record1 = new Record() + record1.setData(ByteBuffer.wrap("Spark In Action".getBytes())) + val record2 = new Record() + record2.setData(ByteBuffer.wrap("Learning Spark".getBytes())) + val batch = List[Record](record1, record2) + val expectedArrayBuffer = new ArrayBuffer[Array[Byte]]() += record1.getData().array() += record2.getData().array() + + var receiverMock: KinesisReceiver = _ + var checkpointerMock: IRecordProcessorCheckpointer = _ + var checkpointClockMock: ManualClock = _ + var checkpointStateMock: CheckpointState = _ + var currentClockMock: Clock = _ + + before { + receiverMock = mock[KinesisReceiver] + checkpointerMock = mock[IRecordProcessorCheckpointer] + checkpointClockMock = mock[ManualClock] + checkpointStateMock = mock[CheckpointState] + currentClockMock = mock[Clock] + } + + test("process records including store and checkpoint") { + val expectedCheckpointIntervalMillis = 10 + expecting { + receiverMock.isStopped().andReturn(false).once() + receiverMock.store(expectedArrayBuffer).once() + checkpointStateMock.shouldCheckpoint().andReturn(true).once() + checkpointerMock.checkpoint().once() + checkpointStateMock.advanceCheckpoint().once() + } + whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) { + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock) + recordProcessor.processRecords(batch, checkpointerMock) + } + } + + test("shouldn't store and checkpoint when receiver is stopped") { + expecting { + receiverMock.isStopped().andReturn(true).once() + } + whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) { + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock) + recordProcessor.processRecords(batch, checkpointerMock) + } + } + + test("shouldn't checkpoint when exception occurs during store") { + expecting { + receiverMock.isStopped().andReturn(false).once() + receiverMock.store(expectedArrayBuffer).andThrow(new RuntimeException()).once() + } + whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) { + intercept[RuntimeException] { + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock) + recordProcessor.processRecords(batch, checkpointerMock) + } + } + } + + test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") { + expecting { + currentClockMock.currentTime().andReturn(0).once() + } + whenExecuting(currentClockMock) { + val checkpointIntervalMillis = 10 + val checkpointState = new CheckpointState(checkpointIntervalMillis, currentClockMock) + assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis) + } + } + + test("should checkpoint if we have exceeded the checkpoint interval") { + expecting { + currentClockMock.currentTime().andReturn(0).once() + } + whenExecuting(currentClockMock) { + val checkpointState = new CheckpointState(Long.MinValue, currentClockMock) + assert(checkpointState.shouldCheckpoint()) + } + } + + test("shouldn't checkpoint if we have not exceeded the checkpoint interval") { + expecting { + currentClockMock.currentTime().andReturn(0).once() + } + whenExecuting(currentClockMock) { + val checkpointState = new CheckpointState(Long.MaxValue, currentClockMock) + assert(!checkpointState.shouldCheckpoint()) + } + } + + test("should add to time when advancing checkpoint") { + expecting { + currentClockMock.currentTime().andReturn(0).once() + } + whenExecuting(currentClockMock) { + val checkpointIntervalMillis = 10 + val checkpointState = new CheckpointState(checkpointIntervalMillis, currentClockMock) + assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis) + checkpointState.advanceCheckpoint() + assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis)) + } + } + + test("shutdown should checkpoint if the reason is TERMINATE") { + expecting { + checkpointerMock.checkpoint().once() + } + whenExecuting(checkpointerMock, checkpointStateMock) { + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock) + val reason = ShutdownReason.TERMINATE + recordProcessor.shutdown(checkpointerMock, reason) + } + } + + test("shutdown should not checkpoint if the reason is something other than TERMINATE") { + expecting { + } + whenExecuting(checkpointerMock, checkpointStateMock) { + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock) + recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE) + recordProcessor.shutdown(checkpointerMock, null) + } + } + + test("string record converter") { + val expectedString = "http://sparkinaction.com" + val expectedByteArray = expectedString.getBytes() + val stringRecordSerializer = new KinesisStringRecordSerializer() + + expectedByteArray should be(stringRecordSerializer.serialize(expectedString)) + + expectedString should be(stringRecordSerializer.deserialize(expectedByteArray)) + expectedString should be(stringRecordSerializer.deserialize(stringRecordSerializer.serialize(expectedString))) + } + + test("retry success on first attempt") { + val expectedIsStopped = false + expecting { + receiverMock.isStopped().andReturn(expectedIsStopped).once() + } + whenExecuting(receiverMock) { + val actualVal = KinesisUtils.retry(receiverMock.isStopped(), 2, 100) + assert(actualVal == expectedIsStopped) + } + } + + test("retry success on second attempt after a Kinesis throttling exception") { + val expectedIsStopped = false + expecting { + receiverMock.isStopped().andThrow(new ThrottlingException("error message")).andReturn(expectedIsStopped).once() + } + whenExecuting(receiverMock) { + val actualVal = KinesisUtils.retry(receiverMock.isStopped(), 2, 100) + assert(actualVal == expectedIsStopped) + } + } + + test("retry success on second attempt after a Kinesis dependency exception") { + val expectedIsStopped = false + expecting { + receiverMock.isStopped().andThrow(new KinesisClientLibDependencyException("error message")).andReturn(expectedIsStopped).once() + } + whenExecuting(receiverMock) { + val actualVal = KinesisUtils.retry(receiverMock.isStopped(), 2, 100) + assert(actualVal == expectedIsStopped) + } + } + + test("retry failed after a shutdown exception") { + expecting { + checkpointerMock.checkpoint().andThrow(new ShutdownException("error message")).once() + } + whenExecuting(checkpointerMock) { + intercept[ShutdownException] { + KinesisUtils.retry(checkpointerMock.checkpoint(), 2, 100) + } + } + } + + test("retry failed after an invalid state exception") { + expecting { + checkpointerMock.checkpoint().andThrow(new InvalidStateException("error message")).once() + } + whenExecuting(checkpointerMock) { + intercept[InvalidStateException] { + KinesisUtils.retry(checkpointerMock.checkpoint(), 2, 100) + } + } + } + + test("retry failed after unexpected exception") { + expecting { + checkpointerMock.checkpoint().andThrow(new RuntimeException("error message")).once() + } + whenExecuting(checkpointerMock) { + intercept[RuntimeException] { + KinesisUtils.retry(checkpointerMock.checkpoint(), 2, 100) + } + } + } + + test("retry failed after exhausing all retries") { + val expectedErrorMessage = "final try error message" + expecting { + checkpointerMock.checkpoint().andThrow(new ThrottlingException("error message")).andThrow(new ThrottlingException(expectedErrorMessage)).once() + } + whenExecuting(checkpointerMock) { + val exception = intercept[RuntimeException] { + KinesisUtils.retry(checkpointerMock.checkpoint(), 2, 100) + } + exception.getMessage().shouldBe(expectedErrorMessage) + } + } +} diff --git a/pom.xml b/pom.xml index 05f76d566e9d1..4dfdee12ec7f0 100644 --- a/pom.xml +++ b/pom.xml @@ -958,6 +958,14 @@ + + + spark-kinesis-asl + + extras/spark-kinesis-asl + + + java8-tests diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 599714233c18f..b2c9fd91e0e91 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -88,7 +88,7 @@ object SparkBuild extends Build { lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core) lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings) - .dependsOn(core, graphx, bagel, mllib, streaming, repl, sql) dependsOn(maybeYarn: _*) dependsOn(maybeHive: _*) dependsOn(maybeGanglia: _*) + .dependsOn(core, graphx, bagel, mllib, streaming, repl, sql) dependsOn(maybeYarn: _*) dependsOn(maybeHive: _*) dependsOn(maybeGanglia: _*) dependsOn(maybeKinesis: _*) lazy val assembleDepsTask = TaskKey[Unit]("assemble-deps") lazy val assembleDeps = assembleDepsTask := { @@ -135,6 +135,15 @@ object SparkBuild extends Build { val maybeGanglia: Seq[ClasspathDependency] = if (isGangliaEnabled) Seq(gangliaProj) else Seq() val maybeGangliaRef: Seq[ProjectReference] = if (isGangliaEnabled) Seq(gangliaProj) else Seq() + // Include Kinesis integration if the user has enabled Kinesis + // This is isolated from the normal build due to ASL-licensed code in the library + lazy val isKinesisEnabled = Properties.envOrNone("SPARK_KINESIS_ASL").isDefined + lazy val kinesisProj = Project("spark-kinesis-asl", file("extras/spark-kinesis-asl"), settings = kinesisSettings) + .dependsOn(streaming % "compile->compile;test->test") + val maybeKinesis: Seq[ClasspathDependency] = if (isKinesisEnabled) Seq(kinesisProj) else Seq() + val maybeKinesisRef: Seq[ProjectReference] = if (isKinesisEnabled) Seq(kinesisProj) else Seq() + + // Include the Java 8 project if the JVM version is 8+ lazy val javaVersion = System.getProperty("java.specification.version") lazy val isJava8Enabled = javaVersion.toDouble >= "1.8".toDouble @@ -171,7 +180,7 @@ object SparkBuild extends Build { .dependsOn(core, mllib, graphx, bagel, streaming, hive) dependsOn(allExternal: _*) // Everything except assembly, hive, tools, java8Tests and examples belong to packageProjects - lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx, catalyst, sql) ++ maybeYarnRef ++ maybeHiveRef ++ maybeGangliaRef + lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx, catalyst, sql) ++ maybeYarnRef ++ maybeHiveRef ++ maybeGangliaRef ++ maybeKinesisRef lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj) ++ maybeJava8Tests @@ -588,6 +597,14 @@ object SparkBuild extends Build { libraryDependencies += "com.codahale.metrics" % "metrics-ganglia" % "3.0.0" ) + def kinesisSettings = streamingSettings ++ Seq( + name := "spark-kinesis-asl", + libraryDependencies ++= Seq( + "com.amazonaws" % "amazon-kinesis-client" % "1.1.0", + "com.amazonaws" % "aws-java-sdk" % "1.8.3" + ) + ) + def java8TestsSettings = sharedSettings ++ Seq( name := "java8-tests", javacOptions := Seq("-target", "1.8", "-source", "1.8"), From cd68c0d7bb0c1ef38e7c92d0cd6eb4a7ccf2ce27 Mon Sep 17 00:00:00 2001 From: Chris Fregly Date: Fri, 18 Jul 2014 20:16:13 -0700 Subject: [PATCH 2/9] fixed typos and backward compatibility --- docs/streaming-programming-guide.md | 2 +- .../spark/streaming/kinesis/KinesisRecordProcessor.scala | 3 +-- project/SparkBuild.scala | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index c91a23b5c0c94..8f9b7c1fa0f0a 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -518,7 +518,7 @@ depending on the checkpoint frequency.
  • Failed or latent KinesisReceivers will be detected and automatically shutdown/load-balanced by the KCL.
  • If possible, explicitly shutdown the worker if a failure occurs.
  • -Example KinesisWordCount (and JavaKiensisWordCount) notes: +Example KinesisWordCount (and JavaKinesisWordCount) notes:
  • These examples automatically determine the number of threads to run locally based on the number of shards for the stream.
  • These examples automatically determine the number of KinesisReceivers/InputDStreams to create based on the number of shards for the stream.
  • These examples use InitialPositionInStream.LATEST (tip of stream) vs. InitialPositionInStream.TRIM_HORIZON (back 24 hours) to simplify reasoning about the examples.
  • diff --git a/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index 8dd24501fe381..c9e8ecd2ebb14 100644 --- a/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -36,8 +36,7 @@ import org.apache.spark.streaming.util.Clock * * @param Kinesis receiver * @param workerId for logging purposes - * @param checkpoint utils - * @param Kinesis checkpoint interval (millis) + * @param checkpoint state */ private[streaming] class KinesisRecordProcessor( receiver: KinesisReceiver, diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 4f4e2d11f1c00..097da182e2902 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -65,7 +65,7 @@ object SparkBuild extends PomBuild { } if (Properties.envOrNone("SPARK_KINESIS_ASL").isDefined) { println("NOTE: SPARK_KINESIS_ASL is deprecated, please use -Pspark-kinesis-asl flag.") - profiles ++= Seq("spark-ganglia-lgpl") + profiles ++= Seq("spark-kinesis-asl") } if (Properties.envOrNone("SPARK_HIVE").isDefined) { println("NOTE: SPARK_HIVE is deprecated, please use -Phive flag.") From 828f8aeb1081cf7ad9e5386e1cce933ece9c3d62 Mon Sep 17 00:00:00 2001 From: Chris Fregly Date: Mon, 21 Jul 2014 22:20:42 -0700 Subject: [PATCH 3/9] more cleanup --- .../sbt_app_core/src/main/scala/SparkApp.scala | 4 ++-- extras/spark-kinesis-asl/pom.xml | 8 ++++++-- project/SparkBuild.scala | 4 ++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala b/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala index 225d82a6c4876..e80c6bb614816 100644 --- a/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala +++ b/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala @@ -47,14 +47,14 @@ object SimpleApp { System.exit(-1) } if (foundGanglia) { - println("Ganglia sink was loaded via spark-core") + println("Ganglia sink was loaded via spark-ganglia-lgpl") System.exit(-1) } // Remove kinesis from default build due to ASL license issue val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisReceiver")).isSuccess if (foundKinesis) { - println("Kinesis was loaded via spark-core") + println("Kinesis was loaded via spark-kinesis-asl") System.exit(-1) } } diff --git a/extras/spark-kinesis-asl/pom.xml b/extras/spark-kinesis-asl/pom.xml index 1b4101194d42f..6e2fdc9f13690 100644 --- a/extras/spark-kinesis-asl/pom.xml +++ b/extras/spark-kinesis-asl/pom.xml @@ -24,14 +24,18 @@ ../../pom.xml - + org.apache.spark spark-kinesis-asl_2.10 jar Spark Kinesis Integration - spark-kinesis-asl + kinesis-asl diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 097da182e2902..a2e2f54745fed 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -36,7 +36,7 @@ object BuildCommons { "streaming-zeromq").map(ProjectRef(buildLocation, _)) val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl, sparkKinesisAsl) = - Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl", "spark-kinesis-asl") + Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl", "kinesis-asl") .map(ProjectRef(buildLocation, _)) val assemblyProjects@Seq(assembly, examples) = Seq("assembly", "examples") @@ -60,7 +60,7 @@ object SparkBuild extends PomBuild { var isAlphaYarn = false var profiles: mutable.Seq[String] = mutable.Seq.empty if (Properties.envOrNone("SPARK_GANGLIA_LGPL").isDefined) { - println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.") + println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.") profiles ++= Seq("spark-ganglia-lgpl") } if (Properties.envOrNone("SPARK_KINESIS_ASL").isDefined) { From 338997e6e750c206bfb50a654b725be5f33beb07 Mon Sep 17 00:00:00 2001 From: Chris Fregly Date: Tue, 22 Jul 2014 08:54:35 -0700 Subject: [PATCH 4/9] improve build docs for kinesis --- docs/streaming-programming-guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 8f9b7c1fa0f0a..75d320fae4620 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -472,10 +472,10 @@ Furthermore, you can also implement your own custom receiver for your sources. S Build notes:
  • Spark supports a Kinesis Streaming Receiver which is not included in the default build due to licensing restrictions.
  • _**Note that by embedding this library you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.
  • -
  • For sbt users, set the `SPARK_KINESIS_ASL` environment variable before building.
  • -
  • For Maven users, enable the `-Pspark-kinesis-asl` profile.
  • -
  • User applications will need to link to the `spark-kinesis-asl` artifact.
  • The Spark Kinesis Streaming Receiver source code, examples, tests, and artifacts live in $SPARK_HOME/extras/spark-kinesis-asl.
  • +
  • sbt and maven builds: must enable the `-Pspark-kinesis-asl` profile.
  • +
  • To build the examples JAR, you must run the maven build with the `-Pspark-kinesis-asl` profile.
  • +
  • Applications will need to link to the `spark-kinesis-asl` artifact.
  • Deployment and runtime notes:
  • Each shard of a stream is processed by one or more KinesisReceiver's managed by the Kinesis Client Library (KCL) Worker.
  • From 6c395619dde93a9b8e9137b1150de4ae5129cf4b Mon Sep 17 00:00:00 2001 From: Chris Fregly Date: Wed, 23 Jul 2014 20:55:55 -0700 Subject: [PATCH 5/9] parameterized the versions of the aws java sdk and kinesis client --- extras/spark-kinesis-asl/pom.xml | 4 ++-- pom.xml | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/extras/spark-kinesis-asl/pom.xml b/extras/spark-kinesis-asl/pom.xml index 6e2fdc9f13690..adb63d5464754 100644 --- a/extras/spark-kinesis-asl/pom.xml +++ b/extras/spark-kinesis-asl/pom.xml @@ -52,12 +52,12 @@ com.amazonaws amazon-kinesis-client - 1.1.0 + ${aws.kinesis.client.version} com.amazonaws aws-java-sdk - 1.8.3 + ${aws.java.sdk.version} org.scalatest diff --git a/pom.xml b/pom.xml index 8619e4fa43b6e..0dece16192017 100644 --- a/pom.xml +++ b/pom.xml @@ -132,6 +132,8 @@ 3.0.0 1.7.6 0.7.1 + 1.8.3 + 1.1.0 64m 512m From 912640cb344c77102e4ca4d884b8b0d0206ed627 Mon Sep 17 00:00:00 2001 From: Chris Fregly Date: Wed, 30 Jul 2014 18:03:27 -0700 Subject: [PATCH 6/9] changed the foundKinesis class to be a publically-avail class --- dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala b/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala index 07884afaf169e..025f71a1ce45a 100644 --- a/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala +++ b/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala @@ -52,7 +52,7 @@ object SimpleApp { } // Remove kinesis from default build due to ASL license issue - val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisReceiver")).isSuccess + val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess if (foundKinesis) { println("Kinesis was loaded via kinesis-asl") System.exit(-1) From d17ca6d6a36ddf0a3030eacae0eace3fdd758cc5 Mon Sep 17 00:00:00 2001 From: Chris Fregly Date: Thu, 31 Jul 2014 10:00:09 -0700 Subject: [PATCH 7/9] per TD's feedback: updated docs, simplified the KinesisUtils api --- .../streaming/JavaKinesisWordCount.java | 4 +- .../examples/streaming/KinesisWordCount.scala | 6 +- .../streaming/kinesis/KinesisReceiver.scala | 26 +++---- .../streaming/kinesis/KinesisUtils.scala | 69 +++++++------------ 4 files changed, 38 insertions(+), 67 deletions(-) diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java index be699a2b8f86e..8543c07aed141 100644 --- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java +++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java @@ -165,14 +165,14 @@ public static void main(String[] args) { /** Create the same number of Kinesis Receivers/DStreams as stream shards, then union them all */ JavaDStream allStreams = KinesisUtils .createStream(jssc, appName, stream, endpoint, checkpointInterval.milliseconds(), - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()); + InitialPositionInStream.LATEST); /** Set the checkpoint interval */ allStreams.checkpoint(checkpointInterval); for (int i = 1; i < numStreams; i++) { /** Create a new Receiver/DStream for each stream shard */ JavaDStream dStream = KinesisUtils .createStream(jssc, appName, stream, endpoint, checkpointInterval.milliseconds(), - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()); + InitialPositionInStream.LATEST); /** Set the Spark checkpoint interval */ dStream.checkpoint(checkpointInterval); diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala index d0e6cdb75cd26..bb036f4d1741e 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala @@ -159,15 +159,13 @@ object KinesisWordCount extends Logging { * them all. */ var allStreams: DStream[Array[Byte]] = KinesisUtils.createStream(ssc, appName, stream, - endpoint, checkpointInterval.milliseconds, InitialPositionInStream.LATEST, - StorageLevel.MEMORY_AND_DISK_2) + endpoint, checkpointInterval.milliseconds, InitialPositionInStream.LATEST) /** Set the checkpoint interval */ allStreams.checkpoint(checkpointInterval) for (i <- 1 until numStreams) { /** Create a new Receiver/DStream for each stream shard */ val dStream = KinesisUtils.createStream(ssc, appName, stream, endpoint, - checkpointInterval.milliseconds, - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) + checkpointInterval.milliseconds, InitialPositionInStream.LATEST) /** Set the Spark checkpoint interval */ dStream.checkpoint(checkpointInterval) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 3f0828431fe15..d6e4b7996877c 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -41,29 +41,23 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker * Instances of this class will get shipped to the Spark Streaming Workers * to run within a Spark Executor. * - * @param appName Kinesis Application Name. Kinesis apps are mapped to Kinesis streams - * by the Kinesis Client Library. If you change the app name or stream name, - * the KCL will throw errors. + * @param appName unique name for your Kinesis app. Multiple instances of the app pull from + * the same stream. The Kinesis Client Library coordinates all load-balancing and + * failure-recovery. * @param stream Kinesis stream name - * @param endpoint url of Kinesis service - * @param checkpointIntervalMillis for Kinesis checkpointing (not Spark checkpointing). - * See the Kinesis Spark Streaming documentation for more details on the different types - * of checkpoints. - * @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the worker's initial - * starting position in the stream. - * The values are either the beginning of the stream per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream - * (InitialPositionInStream.LATEST). - * @param persistence strategy for RDDs and DStreams. + * @param endpoint url of Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * Available endpoints: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * @param checkpointIntervalMillis interval (millis) for Kinesis checkpointing + * @param initialPositionInStream in the absence of a Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. */ private[kinesis] class KinesisReceiver( appName: String, stream: String, endpoint: String, checkpointIntervalMillis: Long, - initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel) - extends Receiver[Array[Byte]](storageLevel) with Logging { receiver => + initialPositionInStream: InitialPositionInStream) + extends Receiver[Array[Byte]](StorageLevel.MEMORY_AND_DISK_2) with Logging { receiver => /** * The following vars are built in the onStart() method which executes in the Spark Worker after diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala index 8a113bb46ddd9..f3b60f1c49686 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -28,8 +28,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn /** - * Facade to create the Scala-based or Java-based streams. - * Also, contains a reusable utility methods. + * Helper class to create Amazon Kinesis Input Stream * :: Experimental :: */ @Experimental @@ -37,25 +36,16 @@ object KinesisUtils extends Logging { /** * Create an InputDStream that pulls messages from a Kinesis stream. * - * @param StreamingContext object - * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams - * by the Kinesis Client Library. If you change the App name or Stream name, - * the KCL will throw errors. - * @param stream Kinesis Stream Name - * @param endpoint url of Kinesis service - * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). - * See the Kinesis Spark Streaming documentation for more details on the different types - * of checkpoints. - * @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the + * @param ssc StreamingContext + * @param appName unique name for your Kinesis app. Multiple instances of the app pull from + * the same stream. The Kinesis Client Library coordinates all load-balancing and + * failure-recovery. + * @param stream Kinesis stream name + * @param endpoint url of Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * Available endpoints: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * @param checkpointIntervalMillis interval (millis) for Kinesis checkpointing + * @param initialPositionInStream in the absence of a Kinesis checkpoint info, this is the * worker's initial starting position in the stream. - * The values are either the beginning of the stream per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream - * (InitialPositionInStream.LATEST). - * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk - * of processing records more than once. - * @param storageLevel The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory - * and on-disk to 2 nodes total (primary and secondary) - * * @return ReceiverInputDStream[Array[Byte]] */ def createStream( @@ -64,34 +54,24 @@ object KinesisUtils extends Logging { stream: String, endpoint: String, checkpointIntervalMillis: Long, - initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = { + initialPositionInStream: InitialPositionInStream): ReceiverInputDStream[Array[Byte]] = { ssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, checkpointIntervalMillis, - initialPositionInStream, storageLevel)) + initialPositionInStream )) } /** * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. * - * @param JavaStreamingContext object - * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams - * by the Kinesis Client Library. If you change the App name or Stream name, - * the KCL will throw errors. - * @param stream Kinesis Stream Name - * @param endpoint url of Kinesis service - * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). - * See the Kinesis Spark Streaming documentation for more details on the different types - * of checkpoints. - * @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the + * @param jssc Java StreamingContext object + * @param appName unique name for your Kinesis app. Multiple instances of the app pull from + * the same stream. The Kinesis Client Library coordinates all load-balancing and + * failure-recovery. + * @param stream Kinesis stream name + * @param endpoint url of Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * Available endpoints: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * @param checkpointIntervalMillis interval (millis) for Kinesis checkpointing + * @param initialPositionInStream in the absence of a Kinesis checkpoint info, this is the * worker's initial starting position in the stream. - * The values are either the beginning of the stream per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream - * (InitialPositionInStream.LATEST). - * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk - * of processing records more than once. - * @param storageLevel The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory - * and on-disk to 2 nodes total (primary and secondary) - * * @return JavaReceiverInputDStream[Array[Byte]] */ def createStream( @@ -99,10 +79,9 @@ object KinesisUtils extends Logging { appName: String, stream: String, endpoint: String, - checkpointIntervalMillis: Long, - initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel): JavaReceiverInputDStream[Array[Byte]] = { + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream): JavaReceiverInputDStream[Array[Byte]] = { jssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, checkpointIntervalMillis, - initialPositionInStream, storageLevel)) + initialPositionInStream)) } } From bf614e9ed870a3c23670d3783d574b1e4280bd81 Mon Sep 17 00:00:00 2001 From: Chris Fregly Date: Thu, 31 Jul 2014 10:33:20 -0700 Subject: [PATCH 8/9] per matei's feedback: moved the kinesis examples into the examples/ dir --- examples/pom.xml | 5 ++ .../streaming/JavaKinesisWordCount.java | 2 +- .../examples/streaming/KinesisWordCount.scala | 4 +- extras/kinesis-asl/bin/run-kinesis-example | 60 ------------- .../kinesis-asl/bin/run-kinesis-example.cmd | 90 ------------------- .../kinesis/KinesisRecordProcessor.scala | 2 +- .../streaming/kinesis/KinesisUtils.scala | 2 +- 7 files changed, 10 insertions(+), 155 deletions(-) rename {extras/kinesis-asl => examples}/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java (99%) rename {extras/kinesis-asl => examples}/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala (99%) delete mode 100755 extras/kinesis-asl/bin/run-kinesis-example delete mode 100755 extras/kinesis-asl/bin/run-kinesis-example.cmd diff --git a/examples/pom.xml b/examples/pom.xml index c4ed0f5a6a02b..d87ac68238eb9 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -96,6 +96,11 @@ spark-streaming-mqtt_${scala.binary.version} ${project.version} + + org.apache.spark + kinesis-asl_${scala.binary.version} + ${project.version} + org.apache.hbase hbase diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java similarity index 99% rename from extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java rename to examples/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java index 8543c07aed141..f13d3c9acce8b 100644 --- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java @@ -73,7 +73,7 @@ * Example: * $ export AWS_ACCESS_KEY_ID= * $ export AWS_SECRET_KEY= - * $ $SPARK_HOME/extras/kinesis-asl/bin/run-kinesis-example \ + * $ $SPARK_HOME/bin/run-example \ * org.apache.spark.examples.streaming.JavaKinesisWordCount mySparkStream \ * https://kinesis.us-east-1.amazonaws.com * diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala similarity index 99% rename from extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala rename to examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala index bb036f4d1741e..50c3889d277fa 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala @@ -68,7 +68,7 @@ import com.amazonaws.services.kinesis.model.PutRecordRequest * Example: * $ export AWS_ACCESS_KEY_ID= * $ export AWS_SECRET_KEY= - * $ $SPARK_HOME/extras/kinesis-asl/bin/run-kinesis-example \ + * $ $SPARK_HOME/bin/run-example \ * org.apache.spark.examples.streaming.KinesisWordCount mySparkStream \ * https://kinesis.us-east-1.amazonaws.com * @@ -260,7 +260,7 @@ object KinesisWordCount extends Logging { * Example: * $ export AWS_ACCESS_KEY_ID= * $ export AWS_SECRET_KEY= - * $ $SPARK_HOME/extras/kinesis-asl/bin/run-kinesis-example \ + * $ $SPARK_HOME/bin/run-example \ * org.apache.spark.examples.streaming.KinesisWordCountProducer mySparkStream \ * https://kinesis.us-east-1.amazonaws.com 10 5 */ diff --git a/extras/kinesis-asl/bin/run-kinesis-example b/extras/kinesis-asl/bin/run-kinesis-example deleted file mode 100755 index 6cf01fbe773a4..0000000000000 --- a/extras/kinesis-asl/bin/run-kinesis-example +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -SCALA_VERSION=2.10 - -FWDIR="$(cd `dirname $0`/../../../; pwd)" -export SPARK_HOME="$FWDIR" -KINESIS_EXAMPLES_DIR="$FWDIR"/extras/kinesis-asl - -if [ -n "$1" ]; then - EXAMPLE_CLASS="$1" - shift -else - echo "Usage: $SPARK_HOME/extras/kinesis-asl/bin/run-kinesis-example [example-args]" 1>&2 - echo " - set MASTER=XX to use a specific master" 1>&2 - echo " - can use abbreviated example class name (e.g. KinesisWordCount, JavaKinesisWordCount)" 1>&2 - echo " - must set AWS_ACCESS_KEY_ID and AWS_SECRET_KEY env variables" 1>&2 - exit 1 -fi - -export GLOBIGNORE="*-javadoc.jar:*-sources.jar" -if [ -f "$FWDIR/RELEASE" ]; then - export SPARK_KINESIS_EXAMPLES_JAR=`ls "$FWDIR"/lib/kinesis-asl*.jar` -elif [ -e "$KINESIS_EXAMPLES_DIR"/target/kinesis-asl_$SCALA_VERSION-*.jar ]; then - export SPARK_KINESIS_EXAMPLES_JAR=`ls "$KINESIS_EXAMPLES_DIR"/target/kinesis-asl_$SCALA_VERSION-*.jar` -fi - -if [[ -z $SPARK_KINESIS_EXAMPLES_JAR ]]; then - echo "Failed to find Spark Kinesis examples assembly in "$FWDIR"/lib or "$KINESIS_EXAMPLES_DIR"/target" 1>&2 - echo "You need to build Spark with maven using 'mvn -Pkinesis-asl package' before running this program." 1>&2 - exit 1 -fi - -EXAMPLE_MASTER=${MASTER:-"local[*]"} - -if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples.streaming* ]]; then - EXAMPLE_CLASS="org.apache.spark.examples.streaming.$EXAMPLE_CLASS" -fi - -"$FWDIR"/bin/spark-submit \ - --master $EXAMPLE_MASTER \ - --class $EXAMPLE_CLASS \ - "$SPARK_KINESIS_EXAMPLES_JAR" \ - $@ diff --git a/extras/kinesis-asl/bin/run-kinesis-example.cmd b/extras/kinesis-asl/bin/run-kinesis-example.cmd deleted file mode 100755 index 0980c78391d49..0000000000000 --- a/extras/kinesis-asl/bin/run-kinesis-example.cmd +++ /dev/null @@ -1,90 +0,0 @@ -@echo off - -rem -rem Licensed to the Apache Software Foundation (ASF) under one or more -rem contributor license agreements. See the NOTICE file distributed with -rem this work for additional information regarding copyright ownership. -rem The ASF licenses this file to You under the Apache License, Version 2.0 -rem (the "License"); you may not use this file except in compliance with -rem the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software -rem distributed under the License is distributed on an "AS IS" BASIS, -rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -rem See the License for the specific language governing permissions and -rem limitations under the License. -rem - -set SCALA_VERSION=2.10 - -rem Figure out where the Spark framework is installed -set FWDIR=%~dp0..\..\..\ - -rem Export this as SPARK_HOME -set SPARK_HOME=%FWDIR% - -rem Load environment variables from conf\spark-env.cmd, if it exists -if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" - -rem Test that an argument was given -if not "x%1"=="x" goto arg_given - echo Usage: SPARK_HOME/extras/kinesis-asl/bin run-kinesis-example ^ [example-args] - echo - set MASTER=XX to use a specific master - echo - can use abbreviated example class name (e.g. KinesisWordCount, JavaKinesisWordCount) - echo " - must set AWS_ACCESS_KEY_ID and AWS_SECRET_KEY env variables" 1>&2 - - goto exit -:arg_given - -set KINESIS_EXAMPLES_DIR=%FWDIR%extras\kinesis-asl - -rem Figure out the JAR file that our examples were packaged into. -set SPARK_KINESIS_EXAMPLES_JAR= -if exist "%FWDIR%RELEASE" ( - for %%d in ("%FWDIR%lib\kinesis-asl*.jar") do ( - set SPARK_KINESIS_EXAMPLES_JAR=%%d - ) -) else ( - for %%d in ("%KINESIS_EXAMPLES_DIR%\target\kinesis-asl*.jar") do ( - set SPARK_KINESIS_EXAMPLES_JAR=%%d - ) -) -if "x%SPARK_KINESIS_EXAMPLES_JAR%"=="x" ( - echo Failed to find Spark Kinesis examples assembly JAR. - echo You need to build Spark with maven using 'mvn -Pkinesis-asl package' before running this program. - goto exit -) - -rem Set master from MASTER environment variable if given -if "x%MASTER%"=="x" ( - set EXAMPLE_MASTER=local[*] -) else ( - set EXAMPLE_MASTER=%MASTER% -) - -rem If the EXAMPLE_CLASS does not start with org.apache.spark.examples.streaming, add that -set EXAMPLE_CLASS=%1 -set PREFIX=%EXAMPLE_CLASS:~0,25% -if not %PREFIX%==org.apache.spark.examples.streaming ( - set EXAMPLE_CLASS=org.apache.spark.examples.streaming.%EXAMPLE_CLASS% -) - -rem Get the tail of the argument list, to skip the first one. This is surprisingly -rem complicated on Windows. -set "ARGS=" -:top -shift -if "%~1" neq "" ( - set ARGS=%ARGS% "%~1" - goto :top -) -if defined ARGS set ARGS=%ARGS:~1% - -call "%FWDIR%bin\spark-submit.cmd" ^ - --master %EXAMPLE_MASTER% ^ - --class %EXAMPLE_CLASS% ^ - "%SPARK_KINESIS_EXAMPLES_JAR%" %ARGS% - -:exit diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index 1c665cf9fd0d3..055e7297706ae 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -36,7 +36,7 @@ import com.amazonaws.services.kinesis.model.Record * @param Kinesis receiver * @param workerId for logging purposes * @param checkpointState represents the checkpoint state including the next time a - * checkpoint is needed. it's injected here for mocking purposes. + * checkpoint is needed. it's injected here for mocking purposes. */ private[kinesis] class KinesisRecordProcessor( receiver: KinesisReceiver, diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala index f3b60f1c49686..2b6b833457e35 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -56,7 +56,7 @@ object KinesisUtils extends Logging { checkpointIntervalMillis: Long, initialPositionInStream: InitialPositionInStream): ReceiverInputDStream[Array[Byte]] = { ssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, checkpointIntervalMillis, - initialPositionInStream )) + initialPositionInStream)) } /** From 2a213155697c5e922d3336210b03f97ace8ae5ab Mon Sep 17 00:00:00 2001 From: Chris Fregly Date: Fri, 1 Aug 2014 15:31:49 -0700 Subject: [PATCH 9/9] SPARK-2770: Rename spark-ganglia-lgpl to ganglia-lgpl --- assembly/pom.xml | 2 +- dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala | 4 ++-- dev/audit-release/sbt_app_ganglia/build.sbt | 2 +- .../sbt_app_ganglia/src/main/scala/SparkApp.scala | 4 ++-- dev/create-release/create-release.sh | 4 ++-- docs/monitoring.md | 2 +- extras/{spark-ganglia-lgpl => ganglia-lgpl}/pom.xml | 0 .../scala/org/apache/spark/metrics/sink/GangliaSink.scala | 0 pom.xml | 4 ++-- project/SparkBuild.scala | 4 ++-- 10 files changed, 13 insertions(+), 13 deletions(-) rename extras/{spark-ganglia-lgpl => ganglia-lgpl}/pom.xml (100%) rename extras/{spark-ganglia-lgpl => ganglia-lgpl}/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala (100%) diff --git a/assembly/pom.xml b/assembly/pom.xml index 824ef383d2e47..c8f1660682879 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -176,7 +176,7 @@
    - spark-ganglia-lgpl + ganglia-lgpl org.apache.spark diff --git a/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala b/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala index 025f71a1ce45a..fc03fec9866a6 100644 --- a/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala +++ b/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala @@ -47,14 +47,14 @@ object SimpleApp { System.exit(-1) } if (foundGanglia) { - println("Ganglia sink was loaded via spark-ganglia-lgpl") + println("Ganglia sink was loaded via spark-core") System.exit(-1) } // Remove kinesis from default build due to ASL license issue val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess if (foundKinesis) { - println("Kinesis was loaded via kinesis-asl") + println("Kinesis was loaded via spark-core") System.exit(-1) } } diff --git a/dev/audit-release/sbt_app_ganglia/build.sbt b/dev/audit-release/sbt_app_ganglia/build.sbt index 6d9474acf5bbc..b506d797ca4c0 100644 --- a/dev/audit-release/sbt_app_ganglia/build.sbt +++ b/dev/audit-release/sbt_app_ganglia/build.sbt @@ -23,7 +23,7 @@ scalaVersion := System.getenv.get("SCALA_VERSION") libraryDependencies += "org.apache.spark" %% "spark-core" % System.getenv.get("SPARK_VERSION") -libraryDependencies += "org.apache.spark" %% "spark-ganglia-lgpl" % System.getenv.get("SPARK_VERSION") +libraryDependencies += "org.apache.spark" %% "ganglia-lgpl" % System.getenv.get("SPARK_VERSION") resolvers ++= Seq( "Spark Release Repository" at System.getenv.get("SPARK_RELEASE_REPOSITORY"), diff --git a/dev/audit-release/sbt_app_ganglia/src/main/scala/SparkApp.scala b/dev/audit-release/sbt_app_ganglia/src/main/scala/SparkApp.scala index 0be8e64fbfabd..3bbeb86224882 100644 --- a/dev/audit-release/sbt_app_ganglia/src/main/scala/SparkApp.scala +++ b/dev/audit-release/sbt_app_ganglia/src/main/scala/SparkApp.scala @@ -28,11 +28,11 @@ object SimpleApp { val foundConsole = Try(Class.forName("org.apache.spark.metrics.sink.ConsoleSink")).isSuccess val foundGanglia = Try(Class.forName("org.apache.spark.metrics.sink.GangliaSink")).isSuccess if (!foundConsole) { - println("Console sink not loaded via spark-core") + println("Console sink not loaded via ganglia-lgpl") System.exit(-1) } if (!foundGanglia) { - println("Ganglia sink not loaded via spark-ganglia-lgpl") + println("Ganglia sink not loaded via ganglia-lgpl") System.exit(-1) } } diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 42473629d4f15..d57de01598cb8 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -54,14 +54,14 @@ if [[ ! "$@" =~ --package-only ]]; then -Dmaven.javadoc.skip=true \ -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \ -Dtag=$GIT_TAG -DautoVersionSubmodules=true \ - -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ + -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pganglia-lgpl -Pkinesis-asl \ --batch-mode release:prepare mvn -DskipTests \ -Darguments="-DskipTests=true -Dmaven.javadoc.skip=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \ -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \ -Dmaven.javadoc.skip=true \ - -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ + -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pganglia-lgpl -Pkinesis-asl \ release:perform cd .. diff --git a/docs/monitoring.md b/docs/monitoring.md index d07ec4a57a2cc..dcdc337086063 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -178,7 +178,7 @@ To install the `GangliaSink` you'll need to perform a custom build of Spark. _** by embedding this library you will include [LGPL](http://www.gnu.org/copyleft/lesser.html)-licensed code in your Spark package**_. For sbt users, set the `SPARK_GANGLIA_LGPL` environment variable before building. For Maven users, enable -the `-Pspark-ganglia-lgpl` profile. In addition to modifying the cluster's Spark build +the `-Pganglia-lgpl` profile. In addition to modifying the cluster's Spark build user applications will need to link to the `spark-ganglia-lgpl` artifact. The syntax of the metrics configuration file is defined in an example configuration file, diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/ganglia-lgpl/pom.xml similarity index 100% rename from extras/spark-ganglia-lgpl/pom.xml rename to extras/ganglia-lgpl/pom.xml diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala similarity index 100% rename from extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala rename to extras/ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala diff --git a/pom.xml b/pom.xml index a354624df9db7..4965c6cb3f07b 100644 --- a/pom.xml +++ b/pom.xml @@ -1007,9 +1007,9 @@ - spark-ganglia-lgpl + ganglia-lgpl - extras/spark-ganglia-lgpl + extras/ganglia-lgpl diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index e3d30ddd0594f..8835aede0b13d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -62,8 +62,8 @@ object SparkBuild extends PomBuild { var isAlphaYarn = false var profiles: mutable.Seq[String] = mutable.Seq.empty if (Properties.envOrNone("SPARK_GANGLIA_LGPL").isDefined) { - println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.") - profiles ++= Seq("spark-ganglia-lgpl") + println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.") + profiles ++= Seq("ganglia-lgpl") } if (Properties.envOrNone("SPARK_HIVE").isDefined) { println("NOTE: SPARK_HIVE is deprecated, please use -Phive flag.")