diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index 2093749f3ee6b..fd1fc26d1b95a 100755 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -42,6 +42,7 @@ object ClientUtils extends Logging{ * @param producerConfig The producer's config * @return topic metadata response */ + @deprecated("This method has been deprecated and will be removed in a future release.", "0.10.0.0") def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala index c465da58019f8..44f924535ec6f 100644 --- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala +++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala @@ -21,6 +21,8 @@ import kafka.producer.ProducerConfig import kafka.producer.KeyedMessage import scala.collection.mutable +@deprecated("This class has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only { def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config)) diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index 72ecae1f3484f..ec3f31e608ddb 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -192,6 +192,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging { removeAllMetricsInList(KafkaMetricsGroup.consumerMetricNameList, clientId) } + @deprecated("This method has been deprecated and will be removed in a future release.", since = "0.10.0.0") def removeAllProducerMetrics(clientId: String) { ProducerRequestStatsRegistry.removeProducerRequestStats(clientId) ProducerTopicStatsRegistry.removeProducerTopicStats(clientId) diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala index 9d0976f49c3d0..244b9fe3da52d 100644 --- a/core/src/main/scala/kafka/producer/BaseProducer.scala +++ b/core/src/main/scala/kafka/producer/BaseProducer.scala @@ -21,11 +21,15 @@ import java.util.Properties // A base producer used whenever we need to have options for both old and new producers; // this class will be removed once we fully rolled out 0.9 +@deprecated("This trait has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") trait BaseProducer { def send(topic: String, key: Array[Byte], value: Array[Byte]) def close() } +@deprecated("This class has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") class NewShinyProducer(producerProps: Properties) extends BaseProducer { import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback @@ -50,6 +54,8 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer { } } +@deprecated("This class has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") class OldProducer(producerProps: Properties) extends BaseProducer { // default to byte array partitioner diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index 6fa00dd0174ee..4616c7e623c06 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -24,7 +24,7 @@ import kafka.common.KafkaException import kafka.utils.Logging import kafka.client.ClientUtils - +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class BrokerPartitionInfo(producerConfig: ProducerConfig, producerPool: ProducerPool, topicPartitionInfo: HashMap[String, TopicMetadata]) @@ -101,4 +101,5 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, } +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int]) diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala index e6b100eb0f5de..928c37cc4a747 100755 --- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala +++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala @@ -21,6 +21,8 @@ package kafka.producer import kafka.utils._ import org.apache.kafka.common.utils.Utils +@deprecated("This class has been deprecated and will be removed in a future release." + + "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0") class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner { def partition(key: Any, numPartitions: Int): Int = { Utils.abs(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]])) % numPartitions diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala index 1141ed16769b8..9d334ac19ca8a 100755 --- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala +++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala @@ -21,6 +21,8 @@ package kafka.producer import kafka.utils._ import org.apache.kafka.common.utils.Utils +@deprecated("This class has been deprecated and will be removed in a future release." + + "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0") class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner { private val random = new java.util.Random diff --git a/core/src/main/scala/kafka/producer/KeyedMessage.scala b/core/src/main/scala/kafka/producer/KeyedMessage.scala index dbcf29515bb86..6941760990214 100644 --- a/core/src/main/scala/kafka/producer/KeyedMessage.scala +++ b/core/src/main/scala/kafka/producer/KeyedMessage.scala @@ -21,6 +21,8 @@ package kafka.producer * A topic, key, and value. * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored. */ +@deprecated("This class has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.ProducerRecord instead.", "0.10.0.0") case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) { if(topic == null) throw new IllegalArgumentException("Topic cannot be null.") diff --git a/core/src/main/scala/kafka/producer/Partitioner.scala b/core/src/main/scala/kafka/producer/Partitioner.scala index efe6d6df87b0e..cf6206c008dee 100644 --- a/core/src/main/scala/kafka/producer/Partitioner.scala +++ b/core/src/main/scala/kafka/producer/Partitioner.scala @@ -23,6 +23,8 @@ package kafka.producer * Implementations will be constructed via reflection and are required to have a constructor that takes a single * VerifiableProperties instance--this allows passing configuration properties into the partitioner implementation. */ +@deprecated("This trait has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.Partitioner instead.", "0.10.0.0") trait Partitioner { /** * Uses the key to calculate a partition bucket id for routing diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 4be06c8d3fc7f..e8c89c378da6d 100755 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -25,7 +25,8 @@ import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThre import kafka.serializer.Encoder import kafka.utils._ - +@deprecated("This class has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") class Producer[K,V](val config: ProducerConfig, private val eventHandler: EventHandler[K,V]) // only for unit testing extends Logging { diff --git a/core/src/main/scala/kafka/producer/ProducerClosedException.scala b/core/src/main/scala/kafka/producer/ProducerClosedException.scala index 27a529331106f..4f2f7316dabf6 100644 --- a/core/src/main/scala/kafka/producer/ProducerClosedException.scala +++ b/core/src/main/scala/kafka/producer/ProducerClosedException.scala @@ -17,5 +17,6 @@ package kafka.producer +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerClosedException() extends RuntimeException("producer already closed") { } diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index 08a4e5146678d..44abb2cf1f314 100755 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -23,6 +23,8 @@ import kafka.utils.{CoreUtils, VerifiableProperties} import kafka.message.NoCompressionCodec import kafka.common.{InvalidConfigException, Config} +@deprecated("This object has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0") object ProducerConfig extends Config { def validate(config: ProducerConfig) { validateClientId(config.clientId) @@ -48,6 +50,8 @@ object ProducerConfig extends Config { } } +@deprecated("This class has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0") class ProducerConfig private (val props: VerifiableProperties) extends AsyncProducerConfig with SyncProducerConfigShared { import ProducerConfig._ diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala index 5ad68129403dd..60cef6397f7b4 100644 --- a/core/src/main/scala/kafka/producer/ProducerPool.scala +++ b/core/src/main/scala/kafka/producer/ProducerPool.scala @@ -26,7 +26,7 @@ import kafka.utils.Logging import scala.collection.mutable.HashMap - +@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0") object ProducerPool { /** * Used in ProducerPool to initiate a SyncProducer connection with a broker. @@ -40,6 +40,7 @@ object ProducerPool { } } +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerPool(val config: ProducerConfig) extends Logging { private val syncProducers = new HashMap[Int, SyncProducer] private val lock = new Object() diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala index b453f639b9082..8ab948a368047 100644 --- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit import kafka.utils.Pool import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker} +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup { val tags = metricId match { case ClientIdAndBroker(clientId, brokerHost, brokerPort) => Map("clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort.toString) @@ -36,6 +37,7 @@ class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup * Tracks metrics of requests made by a given producer client to all brokers. * @param clientId ClientId of the given producer */ +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerRequestStats(clientId: String) { private val valueFactory = (k: ClientIdBroker) => new ProducerRequestMetrics(k) private val stats = new Pool[ClientIdBroker, ProducerRequestMetrics](Some(valueFactory)) @@ -51,6 +53,7 @@ class ProducerRequestStats(clientId: String) { /** * Stores the request stats information of each producer client in a (clientId -> ProducerRequestStats) map. */ +@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0") object ProducerRequestStatsRegistry { private val valueFactory = (k: String) => new ProducerRequestStats(k) private val globalStats = new Pool[String, ProducerRequestStats](Some(valueFactory)) diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala index 1d0fa888c99a5..9466f26d13a2e 100644 --- a/core/src/main/scala/kafka/producer/ProducerStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerStats.scala @@ -20,6 +20,7 @@ import kafka.metrics.KafkaMetricsGroup import java.util.concurrent.TimeUnit import kafka.utils.Pool +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerStats(clientId: String) extends KafkaMetricsGroup { val tags: Map[String, String] = Map("clientId" -> clientId) val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, tags) @@ -30,6 +31,7 @@ class ProducerStats(clientId: String) extends KafkaMetricsGroup { /** * Stores metrics of serialization and message sending activity of each producer client in a (clientId -> ProducerStats) map. */ +@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0") object ProducerStatsRegistry { private val valueFactory = (k: String) => new ProducerStats(k) private val statsRegistry = new Pool[String, ProducerStats](Some(valueFactory)) diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala index 97594c8313672..7bb9610c2c408 100644 --- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala @@ -21,7 +21,7 @@ import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic} import kafka.utils.{Pool, threadsafe} import java.util.concurrent.TimeUnit - +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") @threadsafe class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup { val tags = metricId match { @@ -38,6 +38,7 @@ class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup { * Tracks metrics for each topic the given producer client has produced data to. * @param clientId The clientId of the given producer client. */ +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerTopicStats(clientId: String) { private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k) private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory)) @@ -53,6 +54,7 @@ class ProducerTopicStats(clientId: String) { /** * Stores the topic stats information of each producer client in a (clientId -> ProducerTopicStats) map. */ +@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0") object ProducerTopicStatsRegistry { private val valueFactory = (k: String) => new ProducerTopicStats(k) private val globalStats = new Pool[String, ProducerTopicStats](Some(valueFactory)) diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index ec3c4ab342bb9..16242e68c3c91 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -27,6 +27,8 @@ import org.apache.kafka.common.network.NetworkReceive import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.utils.Utils._ +@deprecated("This object has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") object SyncProducer { val RequestKey: Short = 0 val randomGenerator = new Random @@ -36,6 +38,8 @@ object SyncProducer { * Send a message set. */ @threadsafe +@deprecated("This class has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") class SyncProducer(val config: SyncProducerConfig) extends Logging { private val lock = new Object() diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala index a08ce00a0aae7..a4b9aa028334d 100644 --- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala @@ -20,6 +20,8 @@ package kafka.producer import java.util.Properties import kafka.utils.VerifiableProperties +@deprecated("This class has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0") class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared { def this(originalProps: Properties) { this(new VerifiableProperties(originalProps)) @@ -33,6 +35,8 @@ class SyncProducerConfig private (val props: VerifiableProperties) extends SyncP val port = props.getInt("port") } +@deprecated("This trait has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0") trait SyncProducerConfigShared { val props: VerifiableProperties @@ -59,6 +63,8 @@ trait SyncProducerConfigShared { (1, Integer.MAX_VALUE)) } +@deprecated("This object has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0") object SyncProducerConfig { val DefaultClientId = "" val DefaultRequiredAcks : Short = 0 diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala index dd39de57b039a..d7273c1d9a45b 100644 --- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala @@ -18,6 +18,8 @@ package kafka.producer.async import kafka.utils.VerifiableProperties +@deprecated("This trait has been deprecated and will be removed in a future release." + + "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0") trait AsyncProducerConfig { val props: VerifiableProperties diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 7abe48a36fc38..b79e64b9a57ef 100755 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -31,6 +31,7 @@ import java.util.concurrent.atomic._ import kafka.api.{TopicMetadata, ProducerRequest} import org.apache.kafka.common.utils.Utils +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class DefaultEventHandler[K,V](config: ProducerConfig, private val partitioner: Partitioner, private val encoder: Encoder[V], diff --git a/core/src/main/scala/kafka/producer/async/EventHandler.scala b/core/src/main/scala/kafka/producer/async/EventHandler.scala index e72400077dc69..3a17bfb37ebc2 100644 --- a/core/src/main/scala/kafka/producer/async/EventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/EventHandler.scala @@ -21,6 +21,7 @@ import kafka.producer.KeyedMessage /** * Handler that dispatches the batched data from the queue. */ +@deprecated("This trait has been deprecated and will be removed in a future release.", "0.10.0.0") trait EventHandler[K,V] { /** diff --git a/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala b/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala index 9ecdf7682b860..7779715a67472 100644 --- a/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala +++ b/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala @@ -20,6 +20,7 @@ package kafka.producer.async /** * Indicates that the given config parameter has invalid value */ +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class IllegalQueueStateException(message: String) extends RuntimeException(message) { def this() = this(null) } diff --git a/core/src/main/scala/kafka/producer/async/MissingConfigException.scala b/core/src/main/scala/kafka/producer/async/MissingConfigException.scala index 304e0b24ef25c..a42678b2eb204 100644 --- a/core/src/main/scala/kafka/producer/async/MissingConfigException.scala +++ b/core/src/main/scala/kafka/producer/async/MissingConfigException.scala @@ -18,6 +18,7 @@ package kafka.producer.async /* Indicates any missing configuration parameter */ +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class MissingConfigException(message: String) extends RuntimeException(message) { def this() = this(null) } diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 8a903f32c7fc3..d4237571fb839 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -24,6 +24,7 @@ import kafka.producer.KeyedMessage import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerSendThread[K,V](val threadName: String, val queue: BlockingQueue[KeyedMessage[K,V]], val handler: EventHandler[K,V], diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index b1ab64974128f..0b94902b8806b 100755 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -60,7 +60,7 @@ * The user need to provide the configuration file for 0.7 consumer and 0.8 producer. For 0.8 producer, * the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code. */ -@SuppressWarnings({"unchecked", "rawtypes"}) +@SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) public class KafkaMigrationTool { private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName()); private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";