Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,5 @@ results
tests/results
.ducktape
tests/.ducktape
docs/producer_config.html
docs/consumer_config.html
docs/kafka_config.html
docs/connect_config.html

docs/generated/
47 changes: 38 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ ext {
mavenUrl = project.hasProperty('mavenUrl') ? project.mavenUrl : ''
mavenUsername = project.hasProperty('mavenUsername') ? project.mavenUsername : ''
mavenPassword = project.hasProperty('mavenPassword') ? project.mavenPassword : ''

generatedDocsDir = new File("${project.rootDir}/docs/generated")
}

apply from: file('wrapper.gradle')
Expand Down Expand Up @@ -303,25 +305,51 @@ project(':core') {
into "$buildDir/dependant-libs-${scalaVersion}"
}

tasks.create(name: "genProducerConfigDocs", dependsOn:jar, type: JavaExec) {
task genProtocolErrorDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.common.protocol.Errors'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "protocol_errors.html").newOutputStream()
}

task genProtocolApiKeyDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.common.protocol.ApiKeys'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "protocol_api_keys.html").newOutputStream()
}

task genProtocolMessageDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.common.protocol.Protocol'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "protocol_messages.html").newOutputStream()
}

task genProducerConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.clients.producer.ProducerConfig'
standardOutput = new File('docs/producer_config.html').newOutputStream()
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "producer_config.html").newOutputStream()
}

tasks.create(name: "genConsumerConfigDocs", dependsOn:jar, type: JavaExec) {
task genConsumerConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.clients.consumer.ConsumerConfig'
standardOutput = new File('docs/consumer_config.html').newOutputStream()
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "consumer_config.html").newOutputStream()
}

tasks.create(name: "genKafkaConfigDocs", dependsOn:jar, type: JavaExec) {
task genKafkaConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'kafka.server.KafkaConfig'
standardOutput = new File('docs/kafka_config.html').newOutputStream()
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "kafka_config.html").newOutputStream()
}

task siteDocsTar(dependsOn: ['genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs', ':connect:runtime:genConnectConfigDocs'], type: Tar) {
task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
':connect:runtime:genConnectConfigDocs'], type: Tar) {
classifier = 'site-docs'
compression = Compression.GZIP
from project.file("../docs")
Expand Down Expand Up @@ -764,10 +792,11 @@ project(':connect:runtime') {
}
test.dependsOn('checkstyleMain', 'checkstyleTest')

tasks.create(name: "genConnectConfigDocs", dependsOn:jar, type: JavaExec) {
task genConnectConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.connect.runtime.distributed.DistributedConfig'
standardOutput = new File('docs/connect_config.html').newOutputStream()
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "connect_config.html").newOutputStream()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,30 @@ private ApiKeys(int id, String name) {
public static ApiKeys forId(int id) {
return codeToType[id];
}

private static String toHtml() {
final StringBuilder b = new StringBuilder();
b.append("<table class=\"data-table\"><tbody>\n");
b.append("<tr>");
b.append("<th>Name</th>\n");
b.append("<th>Key</th>\n");
b.append("</tr>");
for (ApiKeys key : ApiKeys.values()) {
b.append("<tr>\n");
b.append("<td>");
b.append(key.name);
b.append("</td>");
b.append("<td>");
b.append(key.id);
b.append("</td>");
b.append("</tr>\n");
}
b.append("</table>\n");
return b.toString();
}

public static void main(String[] args) {
System.out.println(toHtml());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/**
* This class contains all the client-server errors--those errors that must be sent from the server to the client. These
* are thus part of the protocol. The names can be changed but the error code cannot.
*
*
* Do not add exceptions that occur only on the client or only on the server here.
*/
public enum Errors {
Expand Down Expand Up @@ -159,4 +159,37 @@ public static Errors forException(Throwable t) {
Errors error = classToError.get(t.getClass());
return error == null ? UNKNOWN : error;
}

private static String toHtml() {
final StringBuilder b = new StringBuilder();
b.append("<table class=\"data-table\"><tbody>\n");
b.append("<tr>");
b.append("<th>Error</th>\n");
b.append("<th>Code</th>\n");
b.append("<th>Retriable</th>\n");
b.append("<th>Description</th>\n");
b.append("</tr>\n");
for (Errors error : Errors.values()) {
b.append("<tr>");
b.append("<td>");
b.append(error.name());
b.append("</td>");
b.append("<td>");
b.append(error.code());
b.append("</td>");
b.append("<td>");
b.append(error.exception() != null && error.exception() instanceof RetriableException ? "True" : "False");
b.append("</td>");
b.append("<td>");
b.append(error.exception() != null ? error.exception().getMessage() : "");
b.append("</td>");
b.append("</tr>\n");
}
b.append("</table>\n");
return b.toString();
}

public static void main(String[] args) {
System.out.println(toHtml());
}
}
166 changes: 166 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type;

import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.common.protocol.types.Type.BYTES;
import static org.apache.kafka.common.protocol.types.Type.INT16;
Expand Down Expand Up @@ -708,4 +714,164 @@ public class Protocol {
+ " but " + RESPONSES[api.id].length + " response versions.");
}

private static String indentString(int size) {
StringBuilder b = new StringBuilder(size);
for (int i = 0; i < size; i++)
b.append(" ");
return b.toString();
}

private static void schemaToBnfHtml(Schema schema, StringBuilder b, int indentSize) {
final String indentStr = indentString(indentSize);
final Map<String, Type> subTypes = new LinkedHashMap<>();

// Top level fields
for (Field field: schema.fields()) {
if (field.type instanceof ArrayOf) {
b.append("[");
b.append(field.name);
b.append("] ");
Type innerType = ((ArrayOf) field.type).type();
if (innerType instanceof Schema && !subTypes.containsKey(field.name))
subTypes.put(field.name, (Schema) innerType);
} else if (field.type instanceof Schema) {
b.append(field.name);
b.append(" ");
if (!subTypes.containsKey(field.name))
subTypes.put(field.name, (Schema) field.type);
} else {
b.append(field.name);
b.append(" ");
if (!subTypes.containsKey(field.name))
subTypes.put(field.name, field.type);
}
}
b.append("\n");

// Sub Types/Schemas
for (Map.Entry<String, Type> entry: subTypes.entrySet()) {
if (entry.getValue() instanceof Schema) {
// Complex Schema Type
b.append(indentStr);
b.append(entry.getKey());
b.append(" => ");
schemaToBnfHtml((Schema) entry.getValue(), b, indentSize + 2);
} else {
// Standard Field Type
b.append(indentStr);
b.append(entry.getKey());
b.append(" => ");
b.append(entry.getValue());
b.append("\n");
}
}
}

private static void populateSchemaFields(Schema schema, Set<Field> fields) {
for (Field field: schema.fields()) {
fields.add(field);
if (field.type instanceof ArrayOf) {
Type innerType = ((ArrayOf) field.type).type();
if (innerType instanceof Schema)
populateSchemaFields((Schema) innerType, fields);
} else if (field.type instanceof Schema)
populateSchemaFields((Schema) field.type, fields);
}
}

private static void schemaToFieldTableHtml(Schema schema, StringBuilder b) {
Set<Field> fields = new LinkedHashSet<>();
populateSchemaFields(schema, fields);

b.append("<table class=\"data-table\"><tbody>\n");
b.append("<tr>");
b.append("<th>Field</th>\n");
b.append("<th>Description</th>\n");
b.append("</tr>");
for (Field field : fields) {
b.append("<tr>\n");
b.append("<td>");
b.append(field.name);
b.append("</td>");
b.append("<td>");
b.append(field.doc);
b.append("</td>");
b.append("</tr>\n");
}
b.append("</table>\n");
}

public static String toHtml() {
final StringBuilder b = new StringBuilder();
b.append("<h5>Headers:</h5>\n");

b.append("<pre>");
b.append("Request Header => ");
schemaToBnfHtml(REQUEST_HEADER, b, 2);
b.append("</pre>\n");
schemaToFieldTableHtml(REQUEST_HEADER, b);

b.append("<pre>");
b.append("Response Header => ");
schemaToBnfHtml(RESPONSE_HEADER, b, 2);
b.append("</pre>\n");
schemaToFieldTableHtml(RESPONSE_HEADER, b);

for (ApiKeys key : ApiKeys.values()) {
// Key
b.append("<h5>");
b.append(key.name);
b.append(" API (Key: ");
b.append(key.id);
b.append("):</h5>\n\n");
// Requests
b.append("<b>Requests:</b><br>\n");
Schema[] requests = REQUESTS[key.id];
for (int i = 0; i < requests.length; i++) {
Schema schema = requests[i];
// Schema
if (schema != null) {
b.append("<p>");
// Version header
b.append("<pre>");
b.append(key.name);
b.append(" Request (Version: ");
b.append(i);
b.append(") => ");
schemaToBnfHtml(requests[i], b, 2);
b.append("</pre>");
schemaToFieldTableHtml(requests[i], b);
}
b.append("</p>\n");
}

// Responses
b.append("<b>Responses:</b><br>\n");
Schema[] responses = RESPONSES[key.id];
for (int i = 0; i < responses.length; i++) {
Schema schema = responses[i];
// Schema
if (schema != null) {
b.append("<p>");
// Version header
b.append("<pre>");
b.append(key.name);
b.append(" Response (Version: ");
b.append(i);
b.append(") => ");
schemaToBnfHtml(responses[i], b, 2);
b.append("</pre>");
schemaToFieldTableHtml(responses[i], b);
}
b.append("</p>\n");
}
}

return b.toString();
}

public static void main(String[] args) {
System.out.println(toHtml());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ public Type type() {
return type;
}

public Schema schema() {
return schema;
}

}
8 changes: 4 additions & 4 deletions docs/configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ <h3><a id="brokerconfigs" href="#brokerconfigs">3.1 Broker Configs</a></h3>

Topic-level configurations and defaults are discussed in more detail <a href="#topic-config">below</a>.

<!--#include virtual="kafka_config.html" -->
<!--#include virtual="generated/kafka_config.html" -->

<p>More details about broker configuration can be found in the scala class <code>kafka.server.KafkaConfig</code>.</p>

Expand Down Expand Up @@ -150,7 +150,7 @@ <h3><a id="brokerconfigs" href="#brokerconfigs">3.1 Broker Configs</a></h3>
<h3><a id="producerconfigs" href="#producerconfigs">3.2 Producer Configs</a></h3>

Below is the configuration of the Java producer:
<!--#include virtual="producer_config.html" -->
<!--#include virtual="generated/producer_config.html" -->

<p>
For those interested in the legacy Scala producer configs, information can be found <a href="http://kafka.apache.org/082/documentation.html#producerconfigs">
Expand Down Expand Up @@ -330,7 +330,7 @@ <h4><a id="oldconsumerconfigs" href="#oldconsumerconfigs">3.3.1 Old Consumer Con

<h4><a id="newconsumerconfigs" href="#newconsumerconfigs">3.3.2 New Consumer Configs</a></h4>
Since 0.9.0.0 we have been working on a replacement for our existing simple and high-level consumers. The code is considered beta quality. Below is the configuration for the new consumer:
<!--#include virtual="consumer_config.html" -->
<!--#include virtual="generated/consumer_config.html" -->

<h3><a id="connectconfigs" href="#connectconfigs">3.4 Kafka Connect Configs</a></h3>
<!--#include virtual="connect_config.html" -->
<!--#include virtual="generated/connect_config.html" -->
Loading