Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/client/ClientUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object ClientUtils extends Logging{
* @param producerConfig The producer's config
* @return topic metadata response
*/
@deprecated("This method has been deprecated and will be removed in a future release.", "0.10.0.0")
def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/javaapi/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import kafka.producer.ProducerConfig
import kafka.producer.KeyedMessage
import scala.collection.mutable

@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
{
def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config))
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
removeAllMetricsInList(KafkaMetricsGroup.consumerMetricNameList, clientId)
}

@deprecated("This method has been deprecated and will be removed in a future release.", since = "0.10.0.0")
def removeAllProducerMetrics(clientId: String) {
ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
ProducerTopicStatsRegistry.removeProducerTopicStats(clientId)
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/producer/BaseProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import java.util.Properties

// A base producer used whenever we need to have options for both old and new producers;
// this class will be removed once we fully rolled out 0.9
@deprecated("This trait has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
trait BaseProducer {
def send(topic: String, key: Array[Byte], value: Array[Byte])
def close()
}

@deprecated("This class has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
class NewShinyProducer(producerProps: Properties) extends BaseProducer {
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
Expand All @@ -50,6 +54,8 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer {
}
}

@deprecated("This class has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
class OldProducer(producerProps: Properties) extends BaseProducer {

// default to byte array partitioner
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kafka.common.KafkaException
import kafka.utils.Logging
import kafka.client.ClientUtils


@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class BrokerPartitionInfo(producerConfig: ProducerConfig,
producerPool: ProducerPool,
topicPartitionInfo: HashMap[String, TopicMetadata])
Expand Down Expand Up @@ -101,4 +101,5 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,

}

@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package kafka.producer
import kafka.utils._
import org.apache.kafka.common.utils.Utils

@deprecated("This class has been deprecated and will be removed in a future release." +
"It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner {
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]])) % numPartitions
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/producer/DefaultPartitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package kafka.producer
import kafka.utils._
import org.apache.kafka.common.utils.Utils

@deprecated("This class has been deprecated and will be removed in a future release." +
"It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
private val random = new java.util.Random

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/producer/KeyedMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package kafka.producer
* A topic, key, and value.
* If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
*/
@deprecated("This class has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.ProducerRecord instead.", "0.10.0.0")
case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) {
if(topic == null)
throw new IllegalArgumentException("Topic cannot be null.")
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/producer/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package kafka.producer
* Implementations will be constructed via reflection and are required to have a constructor that takes a single
* VerifiableProperties instance--this allows passing configuration properties into the partitioner implementation.
*/
@deprecated("This trait has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.Partitioner instead.", "0.10.0.0")
trait Partitioner {
/**
* Uses the key to calculate a partition bucket id for routing
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThre
import kafka.serializer.Encoder
import kafka.utils._


@deprecated("This class has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
class Producer[K,V](val config: ProducerConfig,
private val eventHandler: EventHandler[K,V]) // only for unit testing
extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@

package kafka.producer

@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerClosedException() extends RuntimeException("producer already closed") {
}
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/producer/ProducerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import kafka.utils.{CoreUtils, VerifiableProperties}
import kafka.message.NoCompressionCodec
import kafka.common.{InvalidConfigException, Config}

@deprecated("This object has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
object ProducerConfig extends Config {
def validate(config: ProducerConfig) {
validateClientId(config.clientId)
Expand All @@ -48,6 +50,8 @@ object ProducerConfig extends Config {
}
}

@deprecated("This class has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
class ProducerConfig private (val props: VerifiableProperties)
extends AsyncProducerConfig with SyncProducerConfigShared {
import ProducerConfig._
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/producer/ProducerPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import kafka.utils.Logging

import scala.collection.mutable.HashMap


@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
object ProducerPool {
/**
* Used in ProducerPool to initiate a SyncProducer connection with a broker.
Expand All @@ -40,6 +40,7 @@ object ProducerPool {
}
}

@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerPool(val config: ProducerConfig) extends Logging {
private val syncProducers = new HashMap[Int, SyncProducer]
private val lock = new Object()
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/producer/ProducerRequestStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import kafka.utils.Pool
import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}

@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
val tags = metricId match {
case ClientIdAndBroker(clientId, brokerHost, brokerPort) => Map("clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort.toString)
Expand All @@ -36,6 +37,7 @@ class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup
* Tracks metrics of requests made by a given producer client to all brokers.
* @param clientId ClientId of the given producer
*/
@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerRequestStats(clientId: String) {
private val valueFactory = (k: ClientIdBroker) => new ProducerRequestMetrics(k)
private val stats = new Pool[ClientIdBroker, ProducerRequestMetrics](Some(valueFactory))
Expand All @@ -51,6 +53,7 @@ class ProducerRequestStats(clientId: String) {
/**
* Stores the request stats information of each producer client in a (clientId -> ProducerRequestStats) map.
*/
@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
object ProducerRequestStatsRegistry {
private val valueFactory = (k: String) => new ProducerRequestStats(k)
private val globalStats = new Pool[String, ProducerRequestStats](Some(valueFactory))
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/producer/ProducerStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import kafka.metrics.KafkaMetricsGroup
import java.util.concurrent.TimeUnit
import kafka.utils.Pool

@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerStats(clientId: String) extends KafkaMetricsGroup {
val tags: Map[String, String] = Map("clientId" -> clientId)
val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, tags)
Expand All @@ -30,6 +31,7 @@ class ProducerStats(clientId: String) extends KafkaMetricsGroup {
/**
* Stores metrics of serialization and message sending activity of each producer client in a (clientId -> ProducerStats) map.
*/
@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
object ProducerStatsRegistry {
private val valueFactory = (k: String) => new ProducerStats(k)
private val statsRegistry = new Pool[String, ProducerStats](Some(valueFactory))
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/producer/ProducerTopicStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
import kafka.utils.{Pool, threadsafe}
import java.util.concurrent.TimeUnit


@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
@threadsafe
class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
val tags = metricId match {
Expand All @@ -38,6 +38,7 @@ class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
* Tracks metrics for each topic the given producer client has produced data to.
* @param clientId The clientId of the given producer client.
*/
@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerTopicStats(clientId: String) {
private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
Expand All @@ -53,6 +54,7 @@ class ProducerTopicStats(clientId: String) {
/**
* Stores the topic stats information of each producer client in a (clientId -> ProducerTopicStats) map.
*/
@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
object ProducerTopicStatsRegistry {
private val valueFactory = (k: String) => new ProducerTopicStats(k)
private val globalStats = new Pool[String, ProducerTopicStats](Some(valueFactory))
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/producer/SyncProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.kafka.common.network.NetworkReceive
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.utils.Utils._

@deprecated("This object has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
object SyncProducer {
val RequestKey: Short = 0
val randomGenerator = new Random
Expand All @@ -36,6 +38,8 @@ object SyncProducer {
* Send a message set.
*/
@threadsafe
@deprecated("This class has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
class SyncProducer(val config: SyncProducerConfig) extends Logging {

private val lock = new Object()
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/producer/SyncProducerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package kafka.producer
import java.util.Properties
import kafka.utils.VerifiableProperties

@deprecated("This class has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared {
def this(originalProps: Properties) {
this(new VerifiableProperties(originalProps))
Expand All @@ -33,6 +35,8 @@ class SyncProducerConfig private (val props: VerifiableProperties) extends SyncP
val port = props.getInt("port")
}

@deprecated("This trait has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
trait SyncProducerConfigShared {
val props: VerifiableProperties

Expand All @@ -59,6 +63,8 @@ trait SyncProducerConfigShared {
(1, Integer.MAX_VALUE))
}

@deprecated("This object has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
object SyncProducerConfig {
val DefaultClientId = ""
val DefaultRequiredAcks : Short = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package kafka.producer.async

import kafka.utils.VerifiableProperties

@deprecated("This trait has been deprecated and will be removed in a future release." +
"Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
trait AsyncProducerConfig {
val props: VerifiableProperties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import java.util.concurrent.atomic._
import kafka.api.{TopicMetadata, ProducerRequest}
import org.apache.kafka.common.utils.Utils

@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class DefaultEventHandler[K,V](config: ProducerConfig,
private val partitioner: Partitioner,
private val encoder: Encoder[V],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import kafka.producer.KeyedMessage
/**
* Handler that dispatches the batched data from the queue.
*/
@deprecated("This trait has been deprecated and will be removed in a future release.", "0.10.0.0")
trait EventHandler[K,V] {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.producer.async
/**
* Indicates that the given config parameter has invalid value
*/
@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class IllegalQueueStateException(message: String) extends RuntimeException(message) {
def this() = this(null)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.producer.async

/* Indicates any missing configuration parameter */
@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class MissingConfigException(message: String) extends RuntimeException(message) {
def this() = this(null)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import kafka.producer.KeyedMessage
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge

@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerSendThread[K,V](val threadName: String,
val queue: BlockingQueue[KeyedMessage[K,V]],
val handler: EventHandler[K,V],
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/KafkaMigrationTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
* The user need to provide the configuration file for 0.7 consumer and 0.8 producer. For 0.8 producer,
* the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
public class KafkaMigrationTool {
private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
Expand Down