diff --git a/assembly/pom.xml b/assembly/pom.xml index 703f15925bc44..c8f1660682879 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -176,7 +176,7 @@ - spark-ganglia-lgpl + ganglia-lgpl org.apache.spark @@ -185,6 +185,16 @@ + + kinesis-asl + + + org.apache.spark + kinesis-asl_${scala.binary.version} + ${project.version} + + + bigtop-dist + + 4.0.0 + + org.apache.spark + spark-parent + 1.1.0-SNAPSHOT + ../../pom.xml + + + + org.apache.spark + kinesis-asl_2.10 + jar + Spark Kinesis Integration + + + kinesis-asl + + + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + test-jar + test + + + com.amazonaws + amazon-kinesis-client + ${aws.kinesis.client.version} + + + com.amazonaws + aws-java-sdk + ${aws.java.sdk.version} + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.mockito + mockito-all + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + org.easymock + easymockclassextension + test + + + com.novocode + junit-interface + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + + diff --git a/extras/kinesis-asl/src/main/resources/log4j.properties b/extras/kinesis-asl/src/main/resources/log4j.properties new file mode 100644 index 0000000000000..ad789341e62c9 --- /dev/null +++ b/extras/kinesis-asl/src/main/resources/log4j.properties @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=WARN, console + +# File appender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Settings to quiet third party logs that are too verbose +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO + +# Log all Kinesis Streaming messages +log4j.logger.org.apache.spark.examples.streaming=DEBUG +log4j.logger.org.apache.spark.streaming.kinesis=DEBUG diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/CheckpointState.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/CheckpointState.scala new file mode 100644 index 0000000000000..febde542723b2 --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/CheckpointState.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.SystemClock + +/** + * This is a helper class for managing checkpoint clocks. + * + * @param checkpoint interval in millis + * @param current clock. if none specified, will default to current SystemClock + */ +private[kinesis] class CheckpointState( + checkpointIntervalMillis: Long, + currentClock: Clock = new SystemClock()) + extends Logging { + /** + * Initialize the checkpoint clock using the given currentClock + checkpointIntervalMillis + */ + val checkpointClock = new ManualClock() + checkpointClock.setTime(currentClock.currentTime() + checkpointIntervalMillis) + + /** + * Check if it's time to checkpoint based on the current time and the derived time + * for the next checkpoint + * + * @return true if it's time to checkpoint + */ + def shouldCheckpoint(): Boolean = { + new SystemClock().currentTime() > checkpointClock.currentTime() + } + + /** + * Advance the checkpoint clock by the checkpoint interval. + */ + def advanceCheckpoint() = { + checkpointClock.addToTime(checkpointIntervalMillis) + } +} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala new file mode 100644 index 0000000000000..d6e4b7996877c --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver + +import com.amazonaws.auth.AWSCredentialsProvider +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) + * as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers + * to run within a Spark Executor. + * + * @param appName unique name for your Kinesis app. Multiple instances of the app pull from + * the same stream. The Kinesis Client Library coordinates all load-balancing and + * failure-recovery. + * @param stream Kinesis stream name + * @param endpoint url of Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * Available endpoints: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * @param checkpointIntervalMillis interval (millis) for Kinesis checkpointing + * @param initialPositionInStream in the absence of a Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + */ +private[kinesis] class KinesisReceiver( + appName: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream) + extends Receiver[Array[Byte]](StorageLevel.MEMORY_AND_DISK_2) with Logging { receiver => + + /** + * The following vars are built in the onStart() method which executes in the Spark Worker after + * this code is serialized and shipped remotely. + */ + + /** + * workerId should be based on the ip address of the actual Spark Worker where this code runs + * (not the Driver's ip address.) + */ + var workerId: String = null + + /** + * This impl uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file at the default location (~/.aws/credentials) shared by all + * AWS SDKs and the AWS CLI + * Instance profile credentials delivered through the Amazon EC2 metadata service + */ + var credentialsProvider: AWSCredentialsProvider = null + + /** KCL config instance. */ + var kinesisClientLibConfiguration: KinesisClientLibConfiguration = null + + /** + * RecordProcessorFactory creates impls of IRecordProcessor. + * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the + * IRecordProcessor.processRecords() method. + * We're using our custom KinesisRecordProcessor in this case. + */ + var recordProcessorFactory: IRecordProcessorFactory = null + + /** + * Create a Kinesis Worker. + * This is the core client abstraction from the Kinesis Client Library (KCL). + * We pass the RecordProcessorFactory from above as well as the KCL config instance. + * A Kinesis Worker can process 1..* shards from the given stream - each with its + * own RecordProcessor. + */ + var worker: Worker = null + + /** + * This is called when the KinesisReceiver starts and must be non-blocking. + * The KCL creates and manages the receiving/processing thread pool through the Worker.run() + * method. + */ + override def onStart() { + workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID() + credentialsProvider = new DefaultAWSCredentialsProviderChain() + kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, stream, + credentialsProvider, workerId).withKinesisEndpoint(endpoint) + .withInitialPositionInStream(initialPositionInStream).withTaskBackoffTimeMillis(500) + recordProcessorFactory = new IRecordProcessorFactory { + override def createProcessor: IRecordProcessor = new KinesisRecordProcessor(receiver, + workerId, new CheckpointState(checkpointIntervalMillis)) + } + worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration) + worker.run() + logInfo(s"Started receiver with workerId $workerId") + } + + /** + * This is called when the KinesisReceiver stops. + * The KCL worker.shutdown() method stops the receiving/processing threads. + * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown. + */ + override def onStop() { + worker.shutdown() + logInfo(s"Shut down receiver with workerId $workerId") + workerId = null + credentialsProvider = null + kinesisClientLibConfiguration = null + recordProcessorFactory = null + worker = null + } +} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala new file mode 100644 index 0000000000000..055e7297706ae --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.List + +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.Logging + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason +import com.amazonaws.services.kinesis.model.Record + +/** + * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor. + * This implementation operates on the Array[Byte] from the KinesisReceiver. + * The Kinesis Worker creates an instance of this KinesisRecordProcessor upon startup. + * + * @param Kinesis receiver + * @param workerId for logging purposes + * @param checkpointState represents the checkpoint state including the next time a + * checkpoint is needed. it's injected here for mocking purposes. + */ +private[kinesis] class KinesisRecordProcessor( + receiver: KinesisReceiver, + workerId: String, + checkpointState: CheckpointState) extends IRecordProcessor with Logging { + + /** shardId to be populated during initialize() */ + var shardId: String = _ + + /** + * The Kinesis Client Library calls this method during IRecordProcessor initialization. + * + * @param shardId assigned by the KCL to this particular RecordProcessor. + */ + override def initialize(shardId: String) { + logInfo(s"Initialize: Initializing workerId $workerId with shardId $shardId") + this.shardId = shardId + } + + /** + * This method is called by the KCL when a batch of records is pulled from the Kinesis stream. + * This is the record-processing bridge between the KCL's IRecordProcessor.processRecords() + * and Spark Streaming's Receiver.store(). + * + * @param list of records from the Kinesis stream shard + * @param checkpointer used to update Kinesis when this batch has been processed/stored + * in the DStream + */ + override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) { + if (!receiver.isStopped()) { + try { + /** + * Convert the list of records to a list of Array[Byte] + * Note: If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming + * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the + * internally-configured Spark serializer (kryo, etc). + * This is not desirable, so we instead store a raw Array[Byte] and decouple + * ourselves from the internal serialization strategy. + */ + val batchByteArrays = new ArrayBuffer[Array[Byte]](batch.size()) + batchByteArrays ++= batch.map(record => record.getData().array()) + + /** Store the list of Array[Byte] in Spark */ + KinesisRecordProcessorUtils.retry(receiver.store(batchByteArrays), 4, 500) + logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") + + /** + * Checkpoint the sequence number of the last record successfully processed/stored + * in the batch. + * In this implementation, we're checkpointing after the given checkpointIntervalMillis. + * Note that this logic requires that processRecords() be called AND that it's time to + * checkpoint. I point this out because there is no background thread running the + * checkpointer. Checkpointing is tested and trigger only when a new batch comes in. + * If the worker is shutdown cleanly, checkpoint will happen (see shutdown() below). + * However, if the worker dies unexpectedly, a checkpoint may not happen. + * This could lead to records being processed more than once. + */ + if (checkpointState.shouldCheckpoint()) { + /** Perform the checkpoint */ + KinesisRecordProcessorUtils.retry(checkpointer.checkpoint(), 4, 500) + + /** Update the next checkpoint time */ + checkpointState.advanceCheckpoint() + + logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size}" + + s" records for shardId $shardId") + logDebug(s"Checkpoint: Next checkpoint is at " + + s" ${checkpointState.checkpointClock.currentTime()} for shardId $shardId") + } + } catch { + case e: Throwable => { + /** + * If there is a failure within the batch, the batch will not be checkpointed. + * This will potentially cause records since the last checkpoint to be processed + * more than once. + */ + logError(s"Exception: WorkerId $workerId encountered and exception while storing " + + " or checkpointing a batch for workerId $workerId and shardId $shardId.", e) + + /** Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor.*/ + throw e + } + } + } else { + /** RecordProcessor has been stopped. */ + logInfo(s"Stopped: The Spark KinesisReceiver has stopped for workerId $workerId" + + s" and shardId $shardId. No more records will be processed.") + } + } + + /** + * Kinesis Client Library is shutting down this Worker for 1 of 2 reasons: + * 1) the stream is resharding by splitting or merging adjacent shards + * (ShutdownReason.TERMINATE) + * 2) the failed or latent Worker has stopped sending heartbeats for whatever reason + * (ShutdownReason.ZOMBIE) + * + * @param checkpointer used to performn a Kinesis checkpoint for ShutdownReason.TERMINATE + * @param shutdown reason (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE) + */ + override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) { + logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason") + reason match { + /** + * TERMINATE Use Case. Checkpoint. + * Checkpoint to indicate that all records from the shard have been drained and processed. + * It's now OK to read from the new shards that resulted from a resharding event. + */ + case ShutdownReason.TERMINATE => KinesisRecordProcessorUtils.retry(checkpointer.checkpoint(), + 4, 500) + + /** + * ZOMBIE Use Case. NoOp. + * No checkpoint because other workers may have taken over and already started processing + * the same records. + * This may lead to records being processed more than once. + */ + case ShutdownReason.ZOMBIE => + + /** Unknown reason. NoOp */ + case _ => + } + } +} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessorUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessorUtils.scala new file mode 100644 index 0000000000000..63d839f3a3bb3 --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessorUtils.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import scala.util.Random + +import org.apache.spark.Logging + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException + + +/** + * Helper for the KinesisRecordProcessor. + */ +private[kinesis] object KinesisRecordProcessorUtils extends Logging { + /** + * Retry the given amount of times with a random backoff time (millis) less than the + * given maxBackOffMillis + * + * @param expression expression to evalute + * @param numRetriesLeft number of retries left + * @param maxBackOffMillis: max millis between retries + * + * @return Evaluation of the given expression + * @throws Unretryable exception, unexpected exception, + * or any exception that persists after numRetriesLeft reaches 0 + */ + @annotation.tailrec + def retry[T](expression: => T, numRetriesLeft: Int, maxBackOffMillis: Int): T = { + util.Try { expression } match { + /** If the function succeeded, evaluate to x. */ + case util.Success(x) => x + /** If the function failed, either retry or throw the exception */ + case util.Failure(e) => e match { + /** Retry: Throttling or other Retryable exception has occurred */ + case _: ThrottlingException | _: KinesisClientLibDependencyException if numRetriesLeft > 1 + => { + val backOffMillis = Random.nextInt(maxBackOffMillis) + Thread.sleep(backOffMillis) + logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e) + retry(expression, numRetriesLeft - 1, maxBackOffMillis) + } + /** Throw: Shutdown has been requested by the Kinesis Client Library.*/ + case _: ShutdownException => { + logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e) + throw e + } + /** Throw: Non-retryable exception has occurred with the Kinesis Client Library */ + case _: InvalidStateException => { + logError(s"InvalidStateException: Cannot save checkpoint to the DynamoDB table used" + + s" by the Amazon Kinesis Client Library. Table likely doesn't exist.", e) + throw e + } + /** Throw: Unexpected exception has occurred */ + case _ => { + logError(s"Unexpected, non-retryable exception.", e) + throw e + } + } + } + } +} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordSerializer.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordSerializer.scala new file mode 100644 index 0000000000000..b63f19a8fead8 --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordSerializer.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +/** + * Convert custom types to/from Array[Byte]. + * @tparam type to serialize/deserialize + */ +private[streaming] trait KinesisRecordSerializer[T] extends Serializable { + /** + * Convert type to Array[Byte] + * + * @param type to serialize + * @return byte array + */ + def serialize(t: T): Array[Byte] + + /** + * Convert Array[Byte] to type + * + * @param byte array + * @return deserialized type + */ + def deserialize(array: Array[Byte]): T +} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala new file mode 100644 index 0000000000000..4833ccd63d380 --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging + +/** + * Implementation of KinesisRecordSerializer to convert Array[Byte] to/from String. + */ +class KinesisStringRecordSerializer extends KinesisRecordSerializer[String] with Logging { + /** + * Convert String to Array[Byte] + * + * @param string to serialize + * @return byte array + */ + def serialize(string: String): Array[Byte] = { + string.getBytes() + } + + /** + * Convert Array[Byte] to String + * + * @param byte array + * @return deserialized string + */ + def deserialize(array: Array[Byte]): String = { + new String(array) + } +} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala new file mode 100644 index 0000000000000..2b6b833457e35 --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Helper class to create Amazon Kinesis Input Stream + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param ssc StreamingContext + * @param appName unique name for your Kinesis app. Multiple instances of the app pull from + * the same stream. The Kinesis Client Library coordinates all load-balancing and + * failure-recovery. + * @param stream Kinesis stream name + * @param endpoint url of Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * Available endpoints: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * @param checkpointIntervalMillis interval (millis) for Kinesis checkpointing + * @param initialPositionInStream in the absence of a Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( + ssc: StreamingContext, + appName: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream): ReceiverInputDStream[Array[Byte]] = { + ssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, checkpointIntervalMillis, + initialPositionInStream)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param jssc Java StreamingContext object + * @param appName unique name for your Kinesis app. Multiple instances of the app pull from + * the same stream. The Kinesis Client Library coordinates all load-balancing and + * failure-recovery. + * @param stream Kinesis stream name + * @param endpoint url of Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * Available endpoints: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * @param checkpointIntervalMillis interval (millis) for Kinesis checkpointing + * @param initialPositionInStream in the absence of a Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * @return JavaReceiverInputDStream[Array[Byte]] + */ + def createStream( + jssc: JavaStreamingContext, + appName: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream): JavaReceiverInputDStream[Array[Byte]] = { + jssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, checkpointIntervalMillis, + initialPositionInStream)) + } +} diff --git a/extras/kinesis-asl/src/test/resources/log4j.properties b/extras/kinesis-asl/src/test/resources/log4j.properties new file mode 100644 index 0000000000000..b4519708afdf2 --- /dev/null +++ b/extras/kinesis-asl/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala new file mode 100644 index 0000000000000..3e97b2ce289c3 --- /dev/null +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.nio.ByteBuffer + +import scala.collection.JavaConversions.seqAsJavaList +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.ManualClock +import org.scalatest.BeforeAndAfter +import org.scalatest.FunSuite +import org.scalatest.Matchers +import org.scalatest.mock.EasyMockSugar + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason +import com.amazonaws.services.kinesis.model.Record + +/** + * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor + */ +class KinesisReceiverSuite extends FunSuite with Matchers with BeforeAndAfter with EasyMockSugar { + val app = "TestKinesisReceiver" + val stream = "mySparkStream" + val endpoint = "endpoint-url" + val workerId = "dummyWorkerId" + val shardId = "dummyShardId" + + val record1 = new Record() + record1.setData(ByteBuffer.wrap("Spark In Action".getBytes())) + val record2 = new Record() + record2.setData(ByteBuffer.wrap("Learning Spark".getBytes())) + val batch = List[Record](record1, record2) + val expectedArrayBuffer = new ArrayBuffer[Array[Byte]]() += record1.getData().array() += + record2.getData().array() + + var receiverMock: KinesisReceiver = _ + var checkpointerMock: IRecordProcessorCheckpointer = _ + var checkpointClockMock: ManualClock = _ + var checkpointStateMock: CheckpointState = _ + var currentClockMock: Clock = _ + + before { + receiverMock = mock[KinesisReceiver] + checkpointerMock = mock[IRecordProcessorCheckpointer] + checkpointClockMock = mock[ManualClock] + checkpointStateMock = mock[CheckpointState] + currentClockMock = mock[Clock] + } + + test("process records including store and checkpoint") { + val expectedCheckpointIntervalMillis = 10 + expecting { + receiverMock.isStopped().andReturn(false).once() + receiverMock.store(expectedArrayBuffer).once() + checkpointStateMock.shouldCheckpoint().andReturn(true).once() + checkpointerMock.checkpoint().once() + checkpointStateMock.advanceCheckpoint().once() + } + whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) { + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, + checkpointStateMock) + recordProcessor.processRecords(batch, checkpointerMock) + } + } + + test("shouldn't store and checkpoint when receiver is stopped") { + expecting { + receiverMock.isStopped().andReturn(true).once() + } + whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) { + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, + checkpointStateMock) + recordProcessor.processRecords(batch, checkpointerMock) + } + } + + test("shouldn't checkpoint when exception occurs during store") { + expecting { + receiverMock.isStopped().andReturn(false).once() + receiverMock.store(expectedArrayBuffer).andThrow(new RuntimeException()).once() + } + whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) { + intercept[RuntimeException] { + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, + checkpointStateMock) + recordProcessor.processRecords(batch, checkpointerMock) + } + } + } + + test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") { + expecting { + currentClockMock.currentTime().andReturn(0).once() + } + whenExecuting(currentClockMock) { + val checkpointIntervalMillis = 10 + val checkpointState = new CheckpointState(checkpointIntervalMillis, currentClockMock) + assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis) + } + } + + test("should checkpoint if we have exceeded the checkpoint interval") { + expecting { + currentClockMock.currentTime().andReturn(0).once() + } + whenExecuting(currentClockMock) { + val checkpointState = new CheckpointState(Long.MinValue, currentClockMock) + assert(checkpointState.shouldCheckpoint()) + } + } + + test("shouldn't checkpoint if we have not exceeded the checkpoint interval") { + expecting { + currentClockMock.currentTime().andReturn(0).once() + } + whenExecuting(currentClockMock) { + val checkpointState = new CheckpointState(Long.MaxValue, currentClockMock) + assert(!checkpointState.shouldCheckpoint()) + } + } + + test("should add to time when advancing checkpoint") { + expecting { + currentClockMock.currentTime().andReturn(0).once() + } + whenExecuting(currentClockMock) { + val checkpointIntervalMillis = 10 + val checkpointState = new CheckpointState(checkpointIntervalMillis, currentClockMock) + assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis) + checkpointState.advanceCheckpoint() + assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis)) + } + } + + test("shutdown should checkpoint if the reason is TERMINATE") { + expecting { + checkpointerMock.checkpoint().once() + } + whenExecuting(checkpointerMock, checkpointStateMock) { + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, + checkpointStateMock) + val reason = ShutdownReason.TERMINATE + recordProcessor.shutdown(checkpointerMock, reason) + } + } + + test("shutdown should not checkpoint if the reason is something other than TERMINATE") { + expecting { + } + whenExecuting(checkpointerMock, checkpointStateMock) { + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, + checkpointStateMock) + recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE) + recordProcessor.shutdown(checkpointerMock, null) + } + } + + test("string record converter") { + val expectedString = "http://sparkinaction.com" + val expectedByteArray = expectedString.getBytes() + val stringRecordSerializer = new KinesisStringRecordSerializer() + + expectedByteArray should be(stringRecordSerializer.serialize(expectedString)) + + expectedString should be(stringRecordSerializer.deserialize(expectedByteArray)) + expectedString should + be(stringRecordSerializer.deserialize(stringRecordSerializer.serialize(expectedString))) + } + + test("retry success on first attempt") { + val expectedIsStopped = false + expecting { + receiverMock.isStopped().andReturn(expectedIsStopped).once() + } + whenExecuting(receiverMock) { + val actualVal = KinesisRecordProcessorUtils.retry(receiverMock.isStopped(), 2, 100) + assert(actualVal == expectedIsStopped) + } + } + + test("retry success on second attempt after a Kinesis throttling exception") { + val expectedIsStopped = false + expecting { + receiverMock.isStopped().andThrow(new ThrottlingException("error message")) + .andReturn(expectedIsStopped).once() + } + whenExecuting(receiverMock) { + val actualVal = KinesisRecordProcessorUtils.retry(receiverMock.isStopped(), 2, 100) + assert(actualVal == expectedIsStopped) + } + } + + test("retry success on second attempt after a Kinesis dependency exception") { + val expectedIsStopped = false + expecting { + receiverMock.isStopped().andThrow(new KinesisClientLibDependencyException("error message")) + .andReturn(expectedIsStopped).once() + } + whenExecuting(receiverMock) { + val actualVal = KinesisRecordProcessorUtils.retry(receiverMock.isStopped(), 2, 100) + assert(actualVal == expectedIsStopped) + } + } + + test("retry failed after a shutdown exception") { + expecting { + checkpointerMock.checkpoint().andThrow(new ShutdownException("error message")).once() + } + whenExecuting(checkpointerMock) { + intercept[ShutdownException] { + KinesisRecordProcessorUtils.retry(checkpointerMock.checkpoint(), 2, 100) + } + } + } + + test("retry failed after an invalid state exception") { + expecting { + checkpointerMock.checkpoint().andThrow(new InvalidStateException("error message")).once() + } + whenExecuting(checkpointerMock) { + intercept[InvalidStateException] { + KinesisRecordProcessorUtils.retry(checkpointerMock.checkpoint(), 2, 100) + } + } + } + + test("retry failed after unexpected exception") { + expecting { + checkpointerMock.checkpoint().andThrow(new RuntimeException("error message")).once() + } + whenExecuting(checkpointerMock) { + intercept[RuntimeException] { + KinesisRecordProcessorUtils.retry(checkpointerMock.checkpoint(), 2, 100) + } + } + } + + test("retry failed after exhausing all retries") { + val expectedErrorMessage = "final try error message" + expecting { + checkpointerMock.checkpoint().andThrow(new ThrottlingException("error message")) + .andThrow(new ThrottlingException(expectedErrorMessage)).once() + } + whenExecuting(checkpointerMock) { + val exception = intercept[RuntimeException] { + KinesisRecordProcessorUtils.retry(checkpointerMock.checkpoint(), 2, 100) + } + exception.getMessage().shouldBe(expectedErrorMessage) + } + } +} diff --git a/make-distribution.sh b/make-distribution.sh index 1441497b3995a..6d934f7fe2b35 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -196,6 +196,8 @@ cp -r "$FWDIR/bin" "$DISTDIR" cp -r "$FWDIR/python" "$DISTDIR" cp -r "$FWDIR/sbin" "$DISTDIR" cp -r "$FWDIR/ec2" "$DISTDIR" +cp -r "$FWDIR/extras/kinesis-asl/bin" "$DISTDIR" + # Download and copy in tachyon, if requested if [ "$SPARK_TACHYON" == "true" ]; then diff --git a/pom.xml b/pom.xml index ae97bf03c53a2..4965c6cb3f07b 100644 --- a/pom.xml +++ b/pom.xml @@ -134,6 +134,8 @@ 3.0.0 1.7.6 0.7.1 + 1.8.3 + 1.1.0 64m 512m @@ -1005,9 +1007,17 @@ - spark-ganglia-lgpl + ganglia-lgpl - extras/spark-ganglia-lgpl + extras/ganglia-lgpl + + + + + + kinesis-asl + + extras/kinesis-asl diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a8bbd55861954..8835aede0b13d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -37,8 +37,8 @@ object BuildCommons { "spark", "sql", "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter", "streaming-zeromq").map(ProjectRef(buildLocation, _)) - val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl) = - Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl") + val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl, sparkKinesisAsl) = + Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl", "kinesis-asl") .map(ProjectRef(buildLocation, _)) val assemblyProjects@Seq(assembly, examples) = Seq("assembly", "examples") @@ -63,7 +63,7 @@ object SparkBuild extends PomBuild { var profiles: mutable.Seq[String] = mutable.Seq.empty if (Properties.envOrNone("SPARK_GANGLIA_LGPL").isDefined) { println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.") - profiles ++= Seq("spark-ganglia-lgpl") + profiles ++= Seq("ganglia-lgpl") } if (Properties.envOrNone("SPARK_HIVE").isDefined) { println("NOTE: SPARK_HIVE is deprecated, please use -Phive flag.")