Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,4 @@ tests/results
.ducktape
tests/.ducktape

docs/producer_config.html
docs/consumer_config.html
docs/kafka_config.html
docs/connect_config.html
docs/generated/
45 changes: 37 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ ext {

userShowStandardStreams = project.hasProperty("showStandardStreams") ? showStandardStreams : null

generatedDocsDir = new File("${project.rootDir}/docs/generated")
}

apply from: "$rootDir/gradle/dependencies.gradle"
Expand Down Expand Up @@ -328,25 +329,52 @@ project(':core') {
into "$buildDir/dependant-libs-${versions.scala}"
}

tasks.create(name: "genProducerConfigDocs", dependsOn:jar, type: JavaExec) {

task genProtocolErrorDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.common.protocol.Protocol'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "protocol_errors.html").newOutputStream()
}

task genProtocolApiKeyDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.common.protocol.Protocol'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "protocol_api_keys.html").newOutputStream()
}

task genProtocolMessageDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.common.protocol.Protocol'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "protocol_messages.html").newOutputStream()
}

task genProducerConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.clients.producer.ProducerConfig'
standardOutput = new File("$rootDir/docs/producer_config.html").newOutputStream()
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "producer_config.html").newOutputStream()
}

tasks.create(name: "genConsumerConfigDocs", dependsOn:jar, type: JavaExec) {
task genConsumerConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.clients.consumer.ConsumerConfig'
standardOutput = new File("$rootDir/docs/consumer_config.html").newOutputStream()
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "consumer_config.html").newOutputStream()
}

tasks.create(name: "genKafkaConfigDocs", dependsOn:jar, type: JavaExec) {
task genKafkaConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'kafka.server.KafkaConfig'
standardOutput = new File("$rootDir/docs/kafka_config.html").newOutputStream()
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "kafka_config.html").newOutputStream()
}

task siteDocsTar(dependsOn: ['genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs', ':connect:runtime:genConnectConfigDocs'], type: Tar) {
task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
':connect:runtime:genConnectConfigDocs'], type: Tar) {
classifier = 'site-docs'
compression = Compression.GZIP
from project.file("../docs")
Expand Down Expand Up @@ -702,7 +730,8 @@ project(':connect:runtime') {
tasks.create(name: "genConnectConfigDocs", dependsOn:jar, type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.connect.runtime.distributed.DistributedConfig'
standardOutput = new File("$rootDir/docs/connect_config.html").newOutputStream()
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "connect_config.html").newOutputStream()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,30 @@ private ApiKeys(int id, String name) {
public static ApiKeys forId(int id) {
return codeToType[id];
}

private static String toHtml() {
final StringBuilder b = new StringBuilder();
b.append("<table class=\"data-table\"><tbody>\n");
b.append("<tr>");
b.append("<th>Name</th>\n");
b.append("<th>Key</th>\n");
b.append("</tr>");
for (ApiKeys key : ApiKeys.values()) {
b.append("<tr>\n");
b.append("<td>");
b.append(key.name);
b.append("</td>");
b.append("<td>");
b.append(key.id);
b.append("</td>");
b.append("</tr>\n");
}
b.append("</table>\n");
return b.toString();
}

public static void main(String[] args) {
System.out.println(toHtml());
}

}
34 changes: 34 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
Expand Down Expand Up @@ -208,4 +209,37 @@ public static Errors forException(Throwable t) {
}
return UNKNOWN;
}

private static String toHtml() {
final StringBuilder b = new StringBuilder();
b.append("<table class=\"data-table\"><tbody>\n");
b.append("<tr>");
b.append("<th>Error</th>\n");
b.append("<th>Code</th>\n");
b.append("<th>Retriable</th>\n");
b.append("<th>Description</th>\n");
b.append("</tr>\n");
for (Errors error : Errors.values()) {
b.append("<tr>");
b.append("<td>");
b.append(error.name());
b.append("</td>");
b.append("<td>");
b.append(error.code());
b.append("</td>");
b.append("<td>");
b.append(error.exception() != null && error.exception() instanceof RetriableException ? "True" : "False");
b.append("</td>");
b.append("<td>");
b.append(error.exception() != null ? error.exception().getMessage() : "");
b.append("</td>");
b.append("</tr>\n");
}
b.append("</table>\n");
return b.toString();
}

public static void main(String[] args) {
System.out.println(toHtml());
}
}
166 changes: 166 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type;

import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.common.protocol.types.Type.BYTES;
import static org.apache.kafka.common.protocol.types.Type.INT16;
Expand Down Expand Up @@ -750,4 +756,164 @@ public class Protocol {
+ " but " + RESPONSES[api.id].length + " response versions.");
}

private static String indentString(int size) {
StringBuilder b = new StringBuilder(size);
for (int i = 0; i < size; i++)
b.append(" ");
return b.toString();
}

private static void schemaToBnfHtml(Schema schema, StringBuilder b, int indentSize) {
final String indentStr = indentString(indentSize);
final Map<String, Type> subTypes = new LinkedHashMap<>();

// Top level fields
for (Field field: schema.fields()) {
if (field.type instanceof ArrayOf) {
b.append("[");
b.append(field.name);
b.append("] ");
Type innerType = ((ArrayOf) field.type).type();
if (innerType instanceof Schema && !subTypes.containsKey(field.name))
subTypes.put(field.name, (Schema) innerType);
} else if (field.type instanceof Schema) {
b.append(field.name);
b.append(" ");
if (!subTypes.containsKey(field.name))
subTypes.put(field.name, (Schema) field.type);
} else {
b.append(field.name);
b.append(" ");
if (!subTypes.containsKey(field.name))
subTypes.put(field.name, field.type);
}
}
b.append("\n");

// Sub Types/Schemas
for (Map.Entry<String, Type> entry: subTypes.entrySet()) {
if (entry.getValue() instanceof Schema) {
// Complex Schema Type
b.append(indentStr);
b.append(entry.getKey());
b.append(" => ");
schemaToBnfHtml((Schema) entry.getValue(), b, indentSize + 2);
} else {
// Standard Field Type
b.append(indentStr);
b.append(entry.getKey());
b.append(" => ");
b.append(entry.getValue());
b.append("\n");
}
}
}

private static void populateSchemaFields(Schema schema, Set<Field> fields) {
for (Field field: schema.fields()) {
fields.add(field);
if (field.type instanceof ArrayOf) {
Type innerType = ((ArrayOf) field.type).type();
if (innerType instanceof Schema)
populateSchemaFields((Schema) innerType, fields);
} else if (field.type instanceof Schema)
populateSchemaFields((Schema) field.type, fields);
}
}

private static void schemaToFieldTableHtml(Schema schema, StringBuilder b) {
Set<Field> fields = new LinkedHashSet<>();
populateSchemaFields(schema, fields);

b.append("<table class=\"data-table\"><tbody>\n");
b.append("<tr>");
b.append("<th>Field</th>\n");
b.append("<th>Description</th>\n");
b.append("</tr>");
for (Field field : fields) {
b.append("<tr>\n");
b.append("<td>");
b.append(field.name);
b.append("</td>");
b.append("<td>");
b.append(field.doc);
b.append("</td>");
b.append("</tr>\n");
}
b.append("</table>\n");
}

public static String toHtml() {
final StringBuilder b = new StringBuilder();
b.append("<h5>Headers:</h5>\n");

b.append("<pre>");
b.append("Request Header => ");
schemaToBnfHtml(REQUEST_HEADER, b, 2);
b.append("</pre>\n");
schemaToFieldTableHtml(REQUEST_HEADER, b);

b.append("<pre>");
b.append("Response Header => ");
schemaToBnfHtml(RESPONSE_HEADER, b, 2);
b.append("</pre>\n");
schemaToFieldTableHtml(RESPONSE_HEADER, b);

for (ApiKeys key : ApiKeys.values()) {
// Key
b.append("<h5>");
b.append(key.name);
b.append(" API (Key: ");
b.append(key.id);
b.append("):</h5>\n\n");
// Requests
b.append("<b>Requests:</b><br>\n");
Schema[] requests = REQUESTS[key.id];
for (int i = 0; i < requests.length; i++) {
Schema schema = requests[i];
// Schema
if (schema != null) {
b.append("<p>");
// Version header
b.append("<pre>");
b.append(key.name);
b.append(" Request (Version: ");
b.append(i);
b.append(") => ");
schemaToBnfHtml(requests[i], b, 2);
b.append("</pre>");
schemaToFieldTableHtml(requests[i], b);
}
b.append("</p>\n");
}

// Responses
b.append("<b>Responses:</b><br>\n");
Schema[] responses = RESPONSES[key.id];
for (int i = 0; i < responses.length; i++) {
Schema schema = responses[i];
// Schema
if (schema != null) {
b.append("<p>");
// Version header
b.append("<pre>");
b.append(key.name);
b.append(" Response (Version: ");
b.append(i);
b.append(") => ");
schemaToBnfHtml(responses[i], b, 2);
b.append("</pre>");
schemaToFieldTableHtml(responses[i], b);
}
b.append("</p>\n");
}
}

return b.toString();
}

public static void main(String[] args) {
System.out.println(toHtml());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ public Type type() {
return type;
}

public Schema schema() {
return schema;
}

}
8 changes: 4 additions & 4 deletions docs/configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ <h3><a id="brokerconfigs" href="#brokerconfigs">3.1 Broker Configs</a></h3>

Topic-level configurations and defaults are discussed in more detail <a href="#topic-config">below</a>.

<!--#include virtual="kafka_config.html" -->
<!--#include virtual="generated/kafka_config.html" -->

<p>More details about broker configuration can be found in the scala class <code>kafka.server.KafkaConfig</code>.</p>

Expand Down Expand Up @@ -150,7 +150,7 @@ <h3><a id="brokerconfigs" href="#brokerconfigs">3.1 Broker Configs</a></h3>
<h3><a id="producerconfigs" href="#producerconfigs">3.2 Producer Configs</a></h3>

Below is the configuration of the Java producer:
<!--#include virtual="producer_config.html" -->
<!--#include virtual="generated/producer_config.html" -->

<p>
For those interested in the legacy Scala producer configs, information can be found <a href="http://kafka.apache.org/082/documentation.html#producerconfigs">
Expand Down Expand Up @@ -330,7 +330,7 @@ <h4><a id="oldconsumerconfigs" href="#oldconsumerconfigs">3.3.1 Old Consumer Con

<h4><a id="newconsumerconfigs" href="#newconsumerconfigs">3.3.2 New Consumer Configs</a></h4>
Since 0.9.0.0 we have been working on a replacement for our existing simple and high-level consumers. The code is considered beta quality. Below is the configuration for the new consumer:
<!--#include virtual="consumer_config.html" -->
<!--#include virtual="generated/consumer_config.html" -->

<h3><a id="connectconfigs" href="#connectconfigs">3.4 Kafka Connect Configs</a></h3>
<!--#include virtual="connect_config.html" -->
<!--#include virtual="generated/connect_config.html" -->
Loading