From e65c697a2a5e4623101011c6752e753706a26e12 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 3 Jan 2022 19:13:13 +0100 Subject: [PATCH 1/2] MINOR: Improve Connect docs - Fix indendation of code blocks - Add links to all SMTs and Predicates --- .../kafka/connect/tools/PredicateDoc.java | 2 +- .../connect/tools/TransformationDoc.java | 2 +- docs/connect.html | 360 +++++++++--------- 3 files changed, 182 insertions(+), 182 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java index d4399d6cb9ac1..dedbe13c8fe42 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java @@ -61,7 +61,7 @@ private static void printPredicateHtml(PrintStream out, DocInfo docInfo) { out.println("
"); out.print("
"); - out.print(docInfo.predicateName); + out.print("" + docInfo.predicateName + ""); out.println("
"); out.println(docInfo.overview); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java index 5771a6b0ed757..543703a13ac07 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java @@ -75,7 +75,7 @@ private static void printTransformationHtml(PrintStream out, DocInfo docInfo) { out.println("
"); out.print("
"); - out.print(docInfo.transformationName); + out.print("" + docInfo.transformationName + ""); out.println("
"); out.println(docInfo.overview); diff --git a/docs/connect.html b/docs/connect.html index 07f8778f002f2..6cc374f9bcb03 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -41,7 +41,7 @@

Running Kafka ConnectIn standalone mode all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance. You can start a standalone process with the following command:

-    > bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
+> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
     

The first parameter is the configuration for the worker. This includes settings such as the Kafka connection parameters, serialization format, and how frequently to commit offsets. The provided example should work well with a local cluster running with the default configuration provided by config/server.properties. It will require tweaking to use with a different configuration or production deployment. All workers (both standalone and distributed) require a few configs:

@@ -65,7 +65,7 @@

Running Kafka ConnectDistributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data. Execution is very similar to standalone mode:

-    > bin/connect-distributed.sh config/connect-distributed.properties
+> bin/connect-distributed.sh config/connect-distributed.properties
     

The difference is in the class which is started and the configuration parameters which change how the Kafka Connect process decides where to store configurations, how to assign work, and where to store offsets and task statues. In the distributed mode, Kafka Connect stores the offsets, configs and task statuses in Kafka topics. It is recommended to manually create the topics for offset, configs and statuses in order to achieve the desired the number of partitions and replication factors. If the topics are not yet created when starting Kafka Connect, the topics will be auto created with default number of partitions and replication factor, which may not be best suited for its usage.

@@ -122,8 +122,8 @@

TransformationsThroughout the example we'll use schemaless JSON data format. To use schemaless format, we changed the following two lines in connect-standalone.properties from true to false:

-        key.converter.schemas.enable
-        value.converter.schemas.enable
+key.converter.schemas.enable
+value.converter.schemas.enable
     

The file source connector reads each line as a String. We will wrap each line in a Map and then add a second field to identify the origin of the event. To do this, we use two transformations:

@@ -135,17 +135,17 @@

TransformationsAfter adding the transformations, connect-file-source.properties file looks as following:

-        name=local-file-source
-        connector.class=FileStreamSource
-        tasks.max=1
-        file=test.txt
-        topic=connect-test
-        transforms=MakeMap, InsertSource
-        transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
-        transforms.MakeMap.field=line
-        transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
-        transforms.InsertSource.static.field=data_source
-        transforms.InsertSource.static.value=test-file-source
+name=local-file-source
+connector.class=FileStreamSource
+tasks.max=1
+file=test.txt
+topic=connect-test
+transforms=MakeMap, InsertSource
+transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
+transforms.MakeMap.field=line
+transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
+transforms.InsertSource.static.field=data_source
+transforms.InsertSource.static.value=test-file-source
     

All the lines starting with transforms were added for the transformations. You can see the two transformations we created: "InsertSource" and "MakeMap" are aliases that we chose to give the transformations. The transformation types are based on the list of built-in transformations you can see below. Each transformation type has additional configuration: HoistField requires a configuration called "field", which is the name of the field in the map that will include the original String from the file. InsertField transformation lets us specify the field name and the value that we are adding.

@@ -153,17 +153,17 @@

TransformationsWhen we ran the file source connector on my sample file without the transformations, and then read them using kafka-console-consumer.sh, the results were:

-        "foo"
-        "bar"
-        "hello world"
+"foo"
+"bar"
+"hello world"
    

We then create a new file connector, this time after adding the transformations to the configuration file. This time, the results will be:

-        {"line":"foo","data_source":"test-file-source"}
-        {"line":"bar","data_source":"test-file-source"}
-        {"line":"hello world","data_source":"test-file-source"}
+{"line":"foo","data_source":"test-file-source"}
+{"line":"bar","data_source":"test-file-source"}
+{"line":"hello world","data_source":"test-file-source"}
     

You can see that the lines we've read are now part of a JSON map, and there is an extra field with the static value we specified. This is just one example of what you can do with transformations.

@@ -217,33 +217,33 @@

Predicates

To do this we need first to filter out the records destined for the topic 'foo'. The Filter transformation removes records from further processing, and can use the TopicNameMatches predicate to apply the transformation only to records in topics which match a certain regular expression. TopicNameMatches's only configuration property is pattern which is a Java regular expression for matching against the topic name. The configuration would look like this:

-        transforms=Filter
-        transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
-        transforms.Filter.predicate=IsFoo
+transforms=Filter
+transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
+transforms.Filter.predicate=IsFoo
 
-        predicates=IsFoo
-        predicates.IsFoo.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
-        predicates.IsFoo.pattern=foo
+predicates=IsFoo
+predicates.IsFoo.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
+predicates.IsFoo.pattern=foo
     

Next we need to apply ExtractField only when the topic name of the record is not 'bar'. We can't just use TopicNameMatches directly, because that would apply the transformation to matching topic names, not topic names which do not match. The transformation's implicit negate config properties allows us to invert the set of records which a predicate matches. Adding the configuration for this to the previous example we arrive at:

-        transforms=Filter,Extract
-        transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
-        transforms.Filter.predicate=IsFoo
+transforms=Filter,Extract
+transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
+transforms.Filter.predicate=IsFoo
 
-        transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
-        transforms.Extract.field=other_field
-        transforms.Extract.predicate=IsBar
-        transforms.Extract.negate=true
+transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
+transforms.Extract.field=other_field
+transforms.Extract.predicate=IsBar
+transforms.Extract.negate=true
 
-        predicates=IsFoo,IsBar
-        predicates.IsFoo.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
-        predicates.IsFoo.pattern=foo
+predicates=IsFoo,IsBar
+predicates.IsFoo.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
+predicates.IsFoo.pattern=foo
 
-        predicates.IsBar.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
-        predicates.IsBar.pattern=bar
+predicates.IsBar.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
+predicates.IsBar.pattern=bar
     

Kafka Connect includes the following predicates:

@@ -266,7 +266,7 @@

REST API

For example:

-        listeners=http://localhost:8080,https://localhost:8443
+listeners=http://localhost:8080,https://localhost:8443
     

By default, if no listeners are specified, the REST server runs on port 8083 using the HTTP protocol. When using HTTPS, the configuration has to include the SSL configuration. @@ -346,35 +346,35 @@

Error Reportin

By default connectors exhibit "fail fast" behavior immediately upon an error or exception. This is equivalent to adding the following configuration properties with their defaults to a connector configuration:

-        # disable retries on failure
-        errors.retry.timeout=0
+# disable retries on failure
+errors.retry.timeout=0
 
-        # do not log the error and their contexts
-        errors.log.enable=false
+# do not log the error and their contexts
+errors.log.enable=false
 
-        # do not record errors in a dead letter queue topic
-        errors.deadletterqueue.topic.name=
+# do not record errors in a dead letter queue topic
+errors.deadletterqueue.topic.name=
 
-        # Fail on first error
-        errors.tolerance=none
+# Fail on first error
+errors.tolerance=none
     

These and other related connector configuration properties can be changed to provide different behavior. For example, the following configuration properties can be added to a connector configuration to setup error handling with multiple retries, logging to the application logs and the my-connector-errors Kafka topic, and tolerating all errors by reporting them rather than failing the connector task:

-        # retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
-        errors.retry.timeout=600000
-        errors.retry.delay.max.ms=30000
+# retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
+errors.retry.timeout=600000
+errors.retry.delay.max.ms=30000
 
-        # log error context along with application logs, but do not include configs and messages
-        errors.log.enable=true
-        errors.log.include.messages=false
+# log error context along with application logs, but do not include configs and messages
+errors.log.enable=true
+errors.log.include.messages=false
 
-        # produce error context into the Kafka topic
-        errors.deadletterqueue.topic.name=my-connector-errors
+# produce error context into the Kafka topic
+errors.deadletterqueue.topic.name=my-connector-errors
 
-        # Tolerate all errors.
-        errors.tolerance=all
+# Tolerate all errors.
+errors.tolerance=all
     

8.3 Connector Development Guide

@@ -414,34 +414,34 @@

Connector

We'll cover the SourceConnector as a simple example. SinkConnector implementations are very similar. Start by creating the class that inherits from SourceConnector and add a couple of fields that will store parsed configuration information (the filename to read from and the topic to send data to):

-    public class FileStreamSourceConnector extends SourceConnector {
-        private String filename;
-        private String topic;
+public class FileStreamSourceConnector extends SourceConnector {
+    private String filename;
+    private String topic;
     

The easiest method to fill in is taskClass(), which defines the class that should be instantiated in worker processes to actually read the data:

-    @Override
-    public Class<? extends Task> taskClass() {
-        return FileStreamSourceTask.class;
-    }
+@Override
+public Class<? extends Task> taskClass() {
+    return FileStreamSourceTask.class;
+}
     

We will define the FileStreamSourceTask class below. Next, we add some standard lifecycle methods, start() and stop():

-    @Override
-    public void start(Map<String, String> props) {
-        // The complete version includes error handling as well.
-        filename = props.get(FILE_CONFIG);
-        topic = props.get(TOPIC_CONFIG);
-    }
-
-    @Override
-    public void stop() {
-        // Nothing to do since no background monitoring is required.
-    }
+@Override
+public void start(Map<String, String> props) {
+    // The complete version includes error handling as well.
+    filename = props.get(FILE_CONFIG);
+    topic = props.get(TOPIC_CONFIG);
+}
+
+@Override
+public void stop() {
+    // Nothing to do since no background monitoring is required.
+}
     

Finally, the real core of the implementation is in taskConfigs(). In this case we are only @@ -449,17 +449,17 @@

Connector maxTasks argument, we return a list with only one entry:

-    @Override
-    public List<Map<String, String>> taskConfigs(int maxTasks) {
-        ArrayList<Map<String, String>> configs = new ArrayList<>();
-        // Only one input stream makes sense.
-        Map<String, String> config = new HashMap<>();
-        if (filename != null)
-            config.put(FILE_CONFIG, filename);
-        config.put(TOPIC_CONFIG, topic);
-        configs.add(config);
-        return configs;
-    }
+@Override
+public List<Map<String, String>> taskConfigs(int maxTasks) {
+    ArrayList<Map<String, String>> configs = new ArrayList<>();
+    // Only one input stream makes sense.
+    Map<String, String> config = new HashMap<>();
+    if (filename != null)
+        config.put(FILE_CONFIG, filename);
+    config.put(TOPIC_CONFIG, topic);
+    configs.add(config);
+    return configs;
+}
     

Although not used in the example, SourceTask also provides two APIs to commit offsets in the source system: commit and commitRecord. The APIs are provided for source systems which have an acknowledgement mechanism for messages. Overriding these methods allows the source connector to acknowledge messages in the source system, either in bulk or individually, once they have been written to Kafka. @@ -477,22 +477,22 @@

Task Example - Sourc
-    public class FileStreamSourceTask extends SourceTask {
-        String filename;
-        InputStream stream;
-        String topic;
-
-        @Override
-        public void start(Map<String, String> props) {
-            filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
-            stream = openOrThrowError(filename);
-            topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
-        }
+public class FileStreamSourceTask extends SourceTask {
+    String filename;
+    InputStream stream;
+    String topic;
 
-        @Override
-        public synchronized void stop() {
-            stream.close();
-        }
+    @Override
+    public void start(Map<String, String> props) {
+        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
+        stream = openOrThrowError(filename);
+        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
+    }
+
+    @Override
+    public synchronized void stop() {
+        stream.close();
+    }
     

These are slightly simplified versions, but show that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the start() method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the stop() method is synchronized. This will be necessary because SourceTasks are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.

@@ -500,27 +500,27 @@
Task Example - Sourc

Next, we implement the main functionality of the task, the poll() method which gets events from the input system and returns a List<SourceRecord>:

-    @Override
-    public List<SourceRecord> poll() throws InterruptedException {
-        try {
-            ArrayList<SourceRecord> records = new ArrayList<>();
-            while (streamValid(stream) && records.isEmpty()) {
-                LineAndOffset line = readToNextLine(stream);
-                if (line != null) {
-                    Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
-                    Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
-                    records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
-                } else {
-                    Thread.sleep(1);
-                }
+@Override
+public List<SourceRecord> poll() throws InterruptedException {
+    try {
+        ArrayList<SourceRecord> records = new ArrayList<>();
+        while (streamValid(stream) && records.isEmpty()) {
+            LineAndOffset line = readToNextLine(stream);
+            if (line != null) {
+                Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
+                Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
+                records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
+            } else {
+                Thread.sleep(1);
             }
-            return records;
-        } catch (IOException e) {
-            // Underlying stream was killed, probably as a result of calling stop. Allow to return
-            // null, and driving thread will handle any shutdown if necessary.
         }
-        return null;
+        return records;
+    } catch (IOException e) {
+        // Underlying stream was killed, probably as a result of calling stop. Allow to return
+        // null, and driving thread will handle any shutdown if necessary.
     }
+    return null;
+}
     

Again, we've omitted some details, but we can see the important steps: the poll() method is going to be called repeatedly, and for each call it will loop trying to read records from the file. For each line it reads, it also tracks the file offset. It uses this information to create an output SourceRecord with four pieces of information: the source partition (there is only one, the single file being read), source offset (byte offset in the file), output topic name, and output value (the line, and we include a schema indicating this value will always be a string). Other variants of the SourceRecord constructor can also include a specific output partition, a key, and headers.

@@ -532,15 +532,15 @@
Sink Tasks

The previous section described how to implement a simple SourceTask. Unlike SourceConnector and SinkConnector, SourceTask and SinkTask have very different interfaces because SourceTask uses a pull interface and SinkTask uses a push interface. Both share the common lifecycle methods, but the SinkTask interface is quite different:

-    public abstract class SinkTask implements Task {
-        public void initialize(SinkTaskContext context) {
-            this.context = context;
-        }
+public abstract class SinkTask implements Task {
+    public void initialize(SinkTaskContext context) {
+        this.context = context;
+    }
 
-        public abstract void put(Collection<SinkRecord> records);
+    public abstract void put(Collection<SinkRecord> records);
 
-        public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
-        }
+    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+    }
     

The SinkTask documentation contains full details, but this interface is nearly as simple as the SourceTask. The put() method should contain most of the implementation, accepting sets of SinkRecords, performing any required translation, and storing them in the destination system. This method does not need to ensure the data has been fully written to the destination system before returning. In fact, in many cases internal buffering will be useful so an entire batch of records can be sent at once, reducing the overhead of inserting events into the downstream data store. The SinkRecords contain essentially the same information as SourceRecords: Kafka topic, partition, offset, the event key and value, and optional headers.

@@ -553,36 +553,36 @@
Err

When error reporting is enabled for a connector, the connector can use an ErrantRecordReporter to report problems with individual records sent to a sink connector. The following example shows how a connector's SinkTask subclass might obtain and use the ErrantRecordReporter, safely handling a null reporter when the DLQ is not enabled or when the connector is installed in an older Connect runtime that doesn't have this reporter feature:

-        private ErrantRecordReporter reporter;
-
-        @Override
-        public void start(Map<String, String> props) {
-            ...
-            try {
-                reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
-            } catch (NoSuchMethodException | NoClassDefFoundError e) {
-                // Will occur in Connect runtimes earlier than 2.6
-                reporter = null;
-            }
-        }
+private ErrantRecordReporter reporter;
+
+@Override
+public void start(Map<String, String> props) {
+    ...
+    try {
+        reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
+    } catch (NoSuchMethodException | NoClassDefFoundError e) {
+        // Will occur in Connect runtimes earlier than 2.6
+        reporter = null;
+    }
+}
 
-        @Override
-        public void put(Collection<SinkRecord> records) {
-            for (SinkRecord record: records) {
-                try {
-                    // attempt to process and send record to data sink
-                    process(record);
-                } catch(Exception e) {
-                    if (reporter != null) {
-                        // Send errant record to error reporter
-                        reporter.report(record, e);
-                    } else {
-                        // There's no error reporter, so fail
-                        throw new ConnectException("Failed on record", e);
-                    }
-                }
+@Override
+public void put(Collection<SinkRecord> records) {
+    for (SinkRecord record: records) {
+        try {
+            // attempt to process and send record to data sink
+            process(record);
+        } catch(Exception e) {
+            if (reporter != null) {
+                // Send errant record to error reporter
+                reporter.report(record, e);
+            } else {
+                // There's no error reporter, so fail
+                throw new ConnectException("Failed on record", e);
             }
         }
+    }
+}
     
Resuming from Previous Offsets
@@ -592,13 +592,13 @@
Resuming from Previous Off

To correctly resume upon startup, the task can use the SourceContext passed into its initialize() method to access the offset data. In initialize(), we would add a bit more code to read the offset (if it exists) and seek to that position:

-        stream = new FileInputStream(filename);
-        Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
-        if (offset != null) {
-            Long lastRecordedOffset = (Long) offset.get("position");
-            if (lastRecordedOffset != null)
-                seekToOffset(stream, lastRecordedOffset);
-        }
+stream = new FileInputStream(filename);
+Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
+if (offset != null) {
+    Long lastRecordedOffset = (Long) offset.get("position");
+    if (lastRecordedOffset != null)
+        seekToOffset(stream, lastRecordedOffset);
+}
     

Of course, you might need to read many keys for each of the input streams. The OffsetStorageReader interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.

@@ -610,8 +610,8 @@

Dynamic Input/Output Str

Source connectors need to monitor the source system for changes, e.g. table additions/deletions in a database. When they pick up changes, they should notify the framework via the ConnectorContext object that reconfiguration is necessary. For example, in a SourceConnector:

-        if (inputsChanged())
-            this.context.requestTaskReconfiguration();
+if (inputsChanged())
+    this.context.requestTaskReconfiguration();
     

The framework will promptly request new configuration information and update the tasks, allowing them to gracefully commit their progress before reconfiguring them. Note that in the SourceConnector this monitoring is currently left up to the connector implementation. If an extra thread is required to perform this monitoring, the connector must allocate it itself.

@@ -627,13 +627,13 @@

Connect Configuration Valida

The following code in FileStreamSourceConnector defines the configuration and exposes it to the framework.

-        private static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
-            .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
+private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
+    .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
 
-        public ConfigDef config() {
-            return CONFIG_DEF;
-        }
+public ConfigDef config() {
+    return CONFIG_DEF;
+}
     

ConfigDef class is used for specifying the set of expected configurations. For each configuration, you can specify the name, the type, the default value, the documentation, the group information, the order in the group, the width of the configuration value and the name suitable for display in the UI. Plus, you can provide special validation logic used for single configuration validation by overriding the Validator class. Moreover, as there may be dependencies between configurations, for example, the valid values and visibility of a configuration may change according to the values of other configurations. To handle this, ConfigDef allows you to specify the dependents of a configuration and to provide an implementation of Recommender to get valid values and set visibility of a configuration given the current configuration values.

@@ -649,15 +649,15 @@

Working with Schemas

The API documentation provides a complete reference, but here is a simple example creating a Schema and Struct:

-    Schema schema = SchemaBuilder.struct().name(NAME)
-        .field("name", Schema.STRING_SCHEMA)
-        .field("age", Schema.INT_SCHEMA)
-        .field("admin", SchemaBuilder.bool().defaultValue(false).build())
-        .build();
-
-    Struct struct = new Struct(schema)
-        .put("name", "Barbara Liskov")
-        .put("age", 75);
+Schema schema = SchemaBuilder.struct().name(NAME)
+    .field("name", Schema.STRING_SCHEMA)
+    .field("age", Schema.INT_SCHEMA)
+    .field("admin", SchemaBuilder.bool().defaultValue(false).build())
+    .build();
+
+Struct struct = new Struct(schema)
+    .put("name", "Barbara Liskov")
+    .put("age", 75);
     

If you are implementing a source connector, you'll need to decide when and how to create schemas. Where possible, you should avoid recomputing them as much as possible. For example, if your connector is guaranteed to have a fixed schema, create it statically and reuse a single instance.

@@ -709,7 +709,7 @@

Kafka Connect

-    {
+{
     "name": "file-source",
     "connector": {
         "state": "RUNNING",
@@ -722,7 +722,7 @@ 

Kafka Connect "worker_id": "192.168.1.209:8083" } ] - } +}

From 65f339401af838f6fbd24cc596a81f2810258cfe Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 4 Jan 2022 09:35:53 +0100 Subject: [PATCH 2/2] Inline tags to avoid extra empty lines --- docs/connect.html | 72 ++++++++++++++++------------------------------- 1 file changed, 24 insertions(+), 48 deletions(-) diff --git a/docs/connect.html b/docs/connect.html index 6cc374f9bcb03..66d621248dec5 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -41,8 +41,7 @@

Running Kafka ConnectIn standalone mode all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance. You can start a standalone process with the following command:

-> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
-    
+> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

The first parameter is the configuration for the worker. This includes settings such as the Kafka connection parameters, serialization format, and how frequently to commit offsets. The provided example should work well with a local cluster running with the default configuration provided by config/server.properties. It will require tweaking to use with a different configuration or production deployment. All workers (both standalone and distributed) require a few configs: