Skip to content

KAFKA-14577: Move the scala ConsoleProducer from core to tools module#13214

Closed
Hangleton wants to merge 8 commits intoapache:trunkfrom
Hangleton:move-console-producer
Closed

KAFKA-14577: Move the scala ConsoleProducer from core to tools module#13214
Hangleton wants to merge 8 commits intoapache:trunkfrom
Hangleton:move-console-producer

Conversation

@Hangleton
Copy link
Copy Markdown

@Hangleton Hangleton commented Feb 7, 2023

Code move for the consoleProducer as part of KAFKA-14525 - Move CLI tools from core to tools module. The copy is as identical as possible as the original class and provides iso-functionality. Unit tests are moved as well for ConsoleProducer and LineMessageReader. One minor additional unit test for the console producer.

Tests:

  • Refactored unit tests ConsoleProducerTest and LineMessageReaderTest.
  • Manual exercise of the CLI tool.
Option                                   Description                            
------                                   -----------                            
--batch-size <Integer: size>             Number of messages to send in a single 
                                           batch if they are not being sent     
                                           synchronously. please note that this 
                                           option will be replaced if max-      
                                           partition-memory-bytes is also set   
                                           (default: 16384)                     
--bootstrap-server <String: server to    REQUIRED unless --broker-list          
  connect to>                              (deprecated) is specified. The server
                                           (s) to connect to. The broker list   
                                           string in the form HOST1:PORT1,HOST2:
                                           PORT2.                               
--broker-list <String: broker-list>      DEPRECATED, use --bootstrap-server     
                                           instead; ignored if --bootstrap-     
                                           server is specified.  The broker     
                                           list string in the form HOST1:PORT1, 
                                           HOST2:PORT2.                         
--compression-codec [String:             The compression codec: either 'none',  
  compression-codec]                       'gzip', 'snappy', 'lz4', or 'zstd'.  
                                           If specified without value, then it  
                                           defaults to 'gzip'                   
--help                                   Print usage information.               
--line-reader <String: reader_class>     The class name of the class to use for 
                                           reading lines from standard in. By   
                                           default each line is read as a       
                                           separate message. (default: org.     
                                           apache.kafka.tools.                  
                                           ConsoleProducer$LineMessageReader)   
--max-block-ms <Long: max block on       The max time that the producer will    
  send>                                    block for during a send request.     
                                           (default: 60000)                     
--max-memory-bytes <Long: total memory   The total memory used by the producer  
  in bytes>                                to buffer records waiting to be sent 
                                           to the server. This is the option to 
                                           control `buffer.memory` in producer  
                                           configs. (default: 33554432)         
--max-partition-memory-bytes <Integer:   The buffer size allocated for a        
  memory in bytes per partition>           partition. When records are received 
                                           which are smaller than this size the 
                                           producer will attempt to             
                                           optimistically group them together   
                                           until this size is reached. This is  
                                           the option to control `batch.size`   
                                           in producer configs. (default: 16384)
--message-send-max-retries <Integer>     Brokers can fail receiving the message 
                                           for multiple reasons, and being      
                                           unavailable transiently is just one  
                                           of them. This property specifies the 
                                           number of retries before the         
                                           producer give up and drop this       
                                           message. This is the option to       
                                           control `retries` in producer        
                                           configs. (default: 3)                
--metadata-expiry-ms <Long: metadata     The period of time in milliseconds     
  expiration interval>                     after which we force a refresh of    
                                           metadata even if we haven't seen any 
                                           leadership changes. This is the      
                                           option to control `metadata.max.age. 
                                           ms` in producer configs. (default:   
                                           300000)                              
--producer-property <String:             A mechanism to pass user-defined       
  producer_prop>                           properties in the form key=value to  
                                           the producer.                        
--producer.config <String: config file>  Producer config properties file. Note  
                                           that [producer-property] takes       
                                           precedence over this config.         
--property <String: prop>                A mechanism to pass user-defined       
                                           properties in the form key=value to  
                                           the message reader. This allows      
                                           custom configuration for a user-     
                                           defined message reader.Default       
                                           properties include:                  
                                          parse.key=false                       
                                          parse.headers=false                   
                                          ignore.error=false                    
                                          key.separator=\t                      
                                          headers.delimiter=\t                  
                                          headers.separator=,                   
                                          headers.key.separator=:               
                                          null.marker=   When set, any fields   
                                           (key, value and headers) equal to    
                                           this will be replaced by null        
                                         Default parsing pattern when:          
                                          parse.headers=true and parse.key=true:
                                           "h1:v1,h2:v2...\tkey\tvalue"         
                                          parse.key=true:                       
                                           "key\tvalue"                         
                                          parse.headers=true:                   
                                           "h1:v1,h2:v2...\tvalue"              
--reader-config <String: config file>    Config properties file for the message 
                                           reader. Note that [property] takes   
                                           precedence over this config.         
--request-required-acks <String:         The required `acks` of the producer    
  request required acks>                   requests (default: -1)               
--request-timeout-ms <Integer: request   The ack timeout of the producer        
  timeout ms>                              requests. Value must be non-negative 
                                           and non-zero. (default: 1500)        
--retry-backoff-ms <Long>                Before each retry, the producer        
                                           refreshes the metadata of relevant   
                                           topics. Since leader election takes  
                                           a bit of time, this property         
                                           specifies the amount of time that    
                                           the producer waits before refreshing 
                                           the metadata. This is the option to  
                                           control `retry.backoff.ms` in        
                                           producer configs. (default: 100)     
--socket-buffer-size <Integer: size>     The size of the tcp RECV size. This is 
                                           the option to control `send.buffer.  
                                           bytes` in producer configs.          
                                           (default: 102400)                    
--sync                                   If set message send requests to the    
                                           brokers are synchronously, one at a  
                                           time as they arrive.                 
--timeout <Long: timeout_ms>             If set and the producer is running in  
                                           asynchronous mode, this gives the    
                                           maximum amount of time a message     
                                           will queue awaiting sufficient batch 
                                           size. The value is given in ms. This 
                                           is the option to control `linger.ms` 
                                           in producer configs. (default: 1000) 
--topic <String: topic>                  REQUIRED: The topic id to produce      
                                           messages to.                         
--version                                Display Kafka version.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@Hangleton Hangleton changed the title [KAFKA-14577] Move the scala ConsoleProducer from core to tools module KAFKA-14577: Move the scala ConsoleProducer from core to tools module Feb 7, 2023
@Hangleton
Copy link
Copy Markdown
Author

Hangleton commented Feb 8, 2023

@fvaleri @chia7712

Comment thread bin/kafka-console-producer.sh Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the same change needs to be done in bin/windows/kafka-console-producer.bat

@Hangleton Hangleton force-pushed the move-console-producer branch from 33eb0a7 to 635cb58 Compare February 8, 2023 14:25
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this class is only used by the ConsoleProducer and has one implementation. If no further used is intended, it can be removed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a breaking change, because there is an option that allows to provide your own message reader implementation.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, that is right. Thanks for correcting. On that note, this interface moved from kafka.common.MessageReader to org.apache.kafka.tools.MessageReader. Any foreign implementation will have to be updated accordingly. What is the convention to follow is in this case - preserve the same package to avoid breaking dependencies or change the package to match the new structure?

Copy link
Copy Markdown
Contributor

@fvaleri fvaleri Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky. It's not a class from the public API, but it's exposed via the command line and definitely used (search for AvroMessageReader for example). I also found the still open KIP-641 which deals with this interface.

We should have a deprecation period, but at the same time we don't want to depend on core. Given that checkstyle can only handle a single root package, the only solution I see is to create a :tools:deprecated sub-module containing kafka.common.MessageReader, to be removed in Kafka 4.0.

@mimaison @showuon WDYT?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed on taking an intermediate step to preserve FQN compatibility for the SPI.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hangleton Do you already have a strategy to deal with this? How are you planning to provide a deprecation period? Can we generalize this approach?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • For MessageReader: maybe we could provide a depreciation period until Apache Kafka 4.0 by exposing the class defined in KIP-641 and keep the interface kafka.common.MessageReader inheriting from org.apache.kafka.common.MessageReader to avoid breaking the implementations above? The class org.apache.kafka.common.MessageReader could be put in the clients module (KIP-641 suggests to have MessageReader in this module since it is a public interface).
  • For the JMX Tool: maybe we could keep the existing class with a deprecation warning as you did in PR-13195 and create a shell and bat scripts as the public contract for the new class?
  • kafka.tools.StateChangeLogMerger could be removed if it isn't used as mentioned in PR-13171.
  • There is a missing BAT script for FeatureCommand although the shell script is defined. This could be added.
  • I skimmed through the other command/tool classes and all of them seem to be exposed via a script so that there shouldn't be any compatibility problem there? What other changes do we want to address?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This is more or less what I had in mind. I'm going to go through all tools to see if there is any further issue and then we can discuss the policy with the community and possibly have a vote.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @fvaleri and apologies for the delay, I will look at the KIP. Thanks!

Copy link
Copy Markdown
Contributor

@fvaleri fvaleri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Hangleton, thanks for working on this and it looks great.

I just leaved few comments and ideas for improvements. Let me know what you think.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get rid of the final extra space.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using Header and RecordHeader?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are perfect, indeed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static void validatePortOrDie(OptionParser parser, String hostPort) {
public static void validatePortOrExit(OptionParser parser, String hostPort) {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ToolsUtils.validatePortOrDie(parser, brokerHostsAndPorts());
ToolsUtils.validatePortOrExit(parser, brokerHostsAndPorts());

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that there is no hard dependency with the parent class and there is a dedicated test, I would suggest to put LineMessageReader in a separate file. Also note that the NPath complexity lives here, so this would be the class to suppress in checkstyle.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes senses.

@fvaleri
Copy link
Copy Markdown
Contributor

fvaleri commented Feb 9, 2023

@showuon if you have some time, this looks almost ready.

@Hangleton
Copy link
Copy Markdown
Author

Thank you for your review Frederico (@fvaleri). I pushed the changes to address all your comments.

@Hangleton
Copy link
Copy Markdown
Author

Please ignore the request for review Frederico - was learning about the refresh icon on Github...

Copy link
Copy Markdown
Contributor

@fvaleri fvaleri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hangleton should we move to RecordReader interface now and then call another round of review? Also note this comment.

@Hangleton
Copy link
Copy Markdown
Author

@Hangleton should we move to RecordReader interface now and then call another round of review? Also note this comment.

Thanks Frederico for the follow-up and call-out. I will rebase and update the PR.

@fvaleri
Copy link
Copy Markdown
Contributor

fvaleri commented Jul 7, 2023

Hi @Hangleton, what's the state of this PR? Let me know if you need some help/review.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Oct 6, 2023

This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch)

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions Bot added the stale Stale PRs label Oct 6, 2023
@nizhikov
Copy link
Copy Markdown
Contributor

nizhikov commented Oct 6, 2023

Hello @Hangleton . Do you need help with review of this PR?
Are you still workin on it?

@github-actions github-actions Bot removed the stale Stale PRs label Oct 7, 2023
@mimaison
Copy link
Copy Markdown
Member

mimaison commented Oct 7, 2024

Done in #17157, closing

@mimaison mimaison closed this Oct 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants