diff --git a/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml b/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
index 2e5b0267b0ae7..1d33c569cfcb6 100644
--- a/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
+++ b/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
@@ -32,6 +32,7 @@ body:
- apache-druid
- apache-hdfs
- apache-hive
+ - apache-kafka
- apache-kylin
- apache-livy
- apache-pig
diff --git a/airflow/providers/apache/kafka/CHANGELOG.rst b/airflow/providers/apache/kafka/CHANGELOG.rst
new file mode 100644
index 0000000000000..cef7dda80708a
--- /dev/null
+++ b/airflow/providers/apache/kafka/CHANGELOG.rst
@@ -0,0 +1,25 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Changelog
+---------
+
+1.0.0
+.....
+
+Initial version of the provider.
diff --git a/airflow/providers/apache/kafka/__init__.py b/airflow/providers/apache/kafka/__init__.py
new file mode 100644
index 0000000000000..13a83393a9124
--- /dev/null
+++ b/airflow/providers/apache/kafka/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kafka/hooks/__init__.py b/airflow/providers/apache/kafka/hooks/__init__.py
new file mode 100644
index 0000000000000..13a83393a9124
--- /dev/null
+++ b/airflow/providers/apache/kafka/hooks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kafka/hooks/kafka.py b/airflow/providers/apache/kafka/hooks/kafka.py
new file mode 100644
index 0000000000000..f2c14bb16f0ed
--- /dev/null
+++ b/airflow/providers/apache/kafka/hooks/kafka.py
@@ -0,0 +1,190 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from enum import Enum
+
+from kafka import BrokerConnection, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
+
+from airflow.hooks.base import BaseHook
+
+
+class KafkaHookClient:
+ """Simple wrapper of Kafka classes"""
+
+ def __init__(self, **kwargs):
+ """
+ Take and save configs that common for Kafka classes
+
+ 'bootstrap_servers'
+ 'client_id'
+ """
+ self.configs = kwargs
+
+ def create_broker_connection(self, **kwargs) -> BrokerConnection:
+ """Returns BrokerConnection instance"""
+ broker_connection_conf = dict(self.configs, **kwargs)
+ return BrokerConnection(**broker_connection_conf)
+
+ def create_internal_client(self, **kwargs) -> KafkaClient:
+ """Returns KafkaClient instance"""
+ internal_client_conf = dict(self.configs, **kwargs)
+ return KafkaClient(**internal_client_conf)
+
+ def create_admin_client(self, **kwargs) -> KafkaAdminClient:
+ """Returns KafkaAdminClient instance"""
+ admin_client_conf = dict(self.configs, **kwargs)
+ return KafkaAdminClient(**admin_client_conf)
+
+ def create_producer(self, **kwargs) -> KafkaProducer:
+ """
+ Returns KafkaProducer instance
+
+ Valid parameters:
+ key_serializer: None
+ value_serializer: None
+ acks: 1
+ bootstrap_topics_filter: set()
+ compression_type: None
+ retries: 0
+ batch_size: 16384
+ linger_ms: 0
+ buffer_memory: 33554432
+ max_block_ms: 60000
+ max_request_size: 1048576
+ partitioner: DefaultPartitioner()
+ """
+ producer_conf = dict(self.configs, **kwargs)
+ return KafkaProducer(**producer_conf)
+
+ def create_consumer(self, **kwargs) -> KafkaConsumer:
+ """
+ Returns KafkaConsumer instance.
+
+ Valid arguments:
+ group_id: None
+ key_deserializer: None
+ value_deserializer: None
+ fetch_max_wait_ms: 500
+ fetch_min_bytes: 1
+ fetch_max_bytes: 52428800
+ max_partition_fetch_bytes: 1 * 1024 * 1024
+ max_poll_records: 500
+ max_poll_interval_ms: 300000
+ auto_offset_reset: 'latest'
+ enable_auto_commit: True
+ auto_commit_interval_ms: 5000
+ default_offset_commit_callback: lambda offsets, response: True
+ check_crcs: True
+ session_timeout_ms: 10000
+ heartbeat_interval_ms: 3000
+ consumer_timeout_ms: float('inf')
+ legacy_iterator: False # enable to revert to < 1.4.7 iterator
+ metric_group_prefix: 'consumer'
+ exclude_internal_topics: True
+ partition_assignment_strategy: (RangePartitionAssignor, RoundRobinPartitionAssignor)
+ """
+ consumer_conf = dict(self.configs, **kwargs)
+ return KafkaConsumer(configs=consumer_conf)
+
+
+class SecurityProtocol(Enum):
+ PLAINTEXT: str = 'PLAINTEXT'
+ SASL_PLAINTEXT: str = 'SASL_PLAINTEXT'
+ SASL_SSL: str = 'SASL_SSL'
+ SSL: str = 'SSL'
+
+
+class SaslMechanism(Enum):
+ PLAIN: str = 'PLAIN'
+ GSSAPI: str = 'GSSAPI'
+ OAUTHBEARER: str = 'OAUTHBEARER'
+ SCRAM_SHA_256: str = 'SCRAM-SHA-256'
+ SCRAM_SHA_512: str = 'SCRAM-SHA-512'
+
+
+class KafkaHook(BaseHook):
+ """
+ Interact with Apache Kafka cluster using `python-kafka`.
+ Hook attribute `conn` returns the client which contains library classes
+
+ .. seealso::
+ - https://github.com/dpkp/kafka-python/
+
+ .. seealso::
+ :class:`~airflow.providers.apache.kafka.hooks.kafka.KafkaHook`
+
+ :param kafka_conn_id: The connection id to the Kafka cluster
+ """
+
+ conn_name_attr = 'kafka_conn_id'
+ default_conn_name = 'kafka_default'
+ conn_type = 'kafka'
+ hook_name = 'Apache Kafka'
+
+ def __init__(self, kafka_conn_id: str = 'kafka_default') -> None:
+
+ super().__init__()
+ self.kafka_conn_id = kafka_conn_id
+
+ configs = self._get_configs()
+
+ self.client = KafkaHookClient(
+ bootstrap_servers=self.get_conn_url(),
+ client_id='apache-airflow-kafka-hook',
+ **configs,
+ )
+
+ def get_conn(self) -> KafkaHookClient:
+ """Returns a connection object"""
+ return self.client
+
+ def get_conn_url(self) -> str:
+ """Get Kafka connection url"""
+ conn = self.get_connection(self.kafka_conn_id)
+
+ host = 'localhost' if not conn.host else conn.host
+ port = 9092 if not conn.port else conn.port
+
+ servers = map(lambda h: h if ':' in h else f'{h}:{port}', host.split(','))
+
+ return ','.join(servers)
+
+ def _get_configs(self) -> dict:
+ """Generates configs for Kafka classes"""
+ conn = self.get_connection(self.kafka_conn_id)
+ configs = conn.extra_dejson.copy()
+
+ configs['security_protocol'] = SecurityProtocol(conn.schema.upper()).value
+
+ if configs['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
+ mechanism = configs.get('sasl_mechanism', 'PLAIN').upper()
+ configs['sasl_mechanism'] = SaslMechanism(mechanism)
+ if configs['sasl_mechanism'] == 'OAUTHBEARER':
+ token_provider = object()
+ setattr(token_provider, 'token', lambda _: conn.password)
+ configs['sasl_oauth_token_provider'] = token_provider
+ elif configs['sasl_mechanism'] in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'):
+ configs['sasl_plain_username'] = conn.login
+ configs['sasl_plain_password'] = conn.password
+ elif configs['security_protocol'] == 'SSL':
+ configs['ssl_keyfile'] = conn.login
+ configs['ssl_password'] = conn.password
+
+ return configs
+
+
+__all__ = ['KafkaHook']
diff --git a/airflow/providers/apache/kafka/provider.yaml b/airflow/providers/apache/kafka/provider.yaml
new file mode 100644
index 0000000000000..52e3e81e9cba2
--- /dev/null
+++ b/airflow/providers/apache/kafka/provider.yaml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-apache-kafka
+name: Apache Kafka
+description: |
+ `Apache Kafka `__
+
+versions:
+ - 1.0.0
+
+additional-dependencies:
+ - apache-airflow>=2.1.0
+
+integrations:
+ - integration-name: Apache Kafka
+ external-doc-url: https://kafka.apache.org/
+ logo: /integration-logos/apache/kafka.png
+ tags: [apache]
+
+hooks:
+ - integration-name: Apache Kafka
+ python-modules:
+ - airflow.providers.apache.kafka.hooks.kafka
+
+connection-types:
+ - hook-class-name: airflow.providers.apache.kafka.hooks.kafka.KafkaHook
+ connection-type: kafka
diff --git a/airflow/ui/src/views/Docs.tsx b/airflow/ui/src/views/Docs.tsx
index ea42ca6fdf96e..7bed2d2f273ae 100644
--- a/airflow/ui/src/views/Docs.tsx
+++ b/airflow/ui/src/views/Docs.tsx
@@ -45,6 +45,7 @@ const Docs: React.FC = () => {
{ path: 'apache-druid', name: 'Apache Druid' },
{ path: 'apache-hdfs', name: 'Apache HDFS' },
{ path: 'apache-hive', name: 'Apache Hive' },
+ { path: 'apache-kafka', name: 'Apache Kafka' },
{ path: 'apache-kylin', name: 'Apache Kylin' },
{ path: 'apache-livy', name: 'Apache Livy' },
{ path: 'apache-pig', name: 'Apache Pig' },
diff --git a/docs/apache-airflow-providers-apache-kafka/commits.rst b/docs/apache-airflow-providers-apache-kafka/commits.rst
new file mode 100644
index 0000000000000..58968ee35f157
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-kafka/commits.rst
@@ -0,0 +1,27 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Package apache-airflow-providers-apache-kafka
+------------------------------------------------------
+
+`Apache Kafka `__
+
+
+This is detailed commit list of changes for versions provider package: ``apache.kafka``.
+For high-level changelog, see :doc:`package information including changelog `.
diff --git a/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst b/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst
new file mode 100644
index 0000000000000..72d8d507a919b
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst
@@ -0,0 +1,150 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+.. _howto/connections:kafka:
+
+Kafka Connection
+================
+The Kafka connection type provides connection to a Apache Kafka cluster.
+
+Configuring the Connection
+--------------------------
+
+Host (required)
+ The host to connect to. By default ``localhost``.
+ Might be a single host (example: ``host1.com``)
+ or comma-separated list of hosts (example: ``host1.com,host2.com``)
+ or comma-separated list of hosts with ports (example: ``host1.com:9092,host2.com:9092``).
+
+Port (optional)
+ The port to connect to. By default ``9092``.
+ If host field contains single host or comma-separated list of hosts without ports
+ then port will be used for each host.
+
+Schema (required)
+ Specify the **security protocol** to communicate with brokers. Valid values are:
+
+ * ``PLAINTEXT`` - Un-authenticated, non-encrypted channel.
+ * ``SASL_PLAINTEXT`` - SASL authenticated, non-encrypted channel.
+ * ``SASL_SSL`` - SASL authenticated, SSL channel.
+ * ``SSL`` - SSL channel.
+
+Login (optional)
+ Specify the login to connect. It depends on security protocol.
+
+ - ``PLAINTEXT``: leave empty.
+ - ``SASL_PLAINTEXT``: in addition depends on ``sasl_mechanism``
+
+ * if "PLAIN" or one of "SCRAM" then put the ``sasl_plain_username`` value here;
+ * if "GSSAPI" (kerberos) or "OAUTHBEARER" then leave empty;
+
+ - ``SASL_SSL``: in addition depends on ``sasl_mechanism``
+
+ * if "PLAIN" or one of "SCRAM" then put the ``sasl_plain_username`` value here;
+ * if "GSSAPI" (kerberos) or "OAUTHBEARER" then leave empty;
+
+ - ``SSL``: put the ``ssl_keyfile`` value here.
+
+Password (optional)
+ Specify the secret which used to connect. It depends on authentication provider.
+
+ - ``PLAINTEXT``: leave empty.
+ - ``SASL_PLAINTEXT``: in addition depends on ``sasl_mechanism``
+
+ * if "PLAIN" or one of "SCRAM" then put the ``sasl_plain_password`` value here;
+ * if "OAUTHBEARER" then put token value here;
+ * if "GSSAPI" (kerberos) then leave empty;
+
+ - ``SASL_SSL``: in addition depends on ``sasl_mechanism``
+
+ * if "PLAIN" or one of "SCRAM" then put the ``sasl_plain_password`` value here;
+ * if "OAUTHBEARER" then put token value here;
+ * if "GSSAPI" (kerberos) then leave empty;
+
+ - ``SSL``: put the ``ssl_password`` value here.
+
+Extra (optional)
+ Specify the extra parameters (as json dictionary) that can be used for Kafka classes.
+ The following parameters are supported:
+
+ Connection params:
+
+ * ``request_timeout_ms`` - Client request timeout in milliseconds. By default ``30000``
+ * ``connections_max_idle_ms`` - . By default ``9 * 60 * 1000``
+ * ``reconnect_backoff_ms`` - The amount of time in milliseconds to wait before attempting to reconnect to a given host. By default ``50``
+ * ``reconnect_backoff_max_ms`` - The maximum amount of time in milliseconds to backoff/wait when reconnecting to a broker that has repeatedly failed to connect. By default ``1000``
+ * ``retry_backoff_ms`` - . By default ``100``
+ * ``metadata_max_age_ms`` - . By default ``300000``
+ * ``max_in_flight_requests_per_connection`` - Requests are in pipeline to Apache Kafka brokers up to this number of maximum requests per broker connection. By default ``5``
+ * ``receive_buffer_bytes`` - The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. By default ``None``
+ * ``send_buffer_bytes`` - The size of the TCP send buffer (SO_SNDBUF) to use when sending data. By default ``None``
+ * ``api_version_auto_timeout_ms`` - Number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. By default ``2000``
+ * ``api_version`` - Specify which Kafka API version to use. Accepted values are: ``(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10)``. Default: ``(0, 8, 2)``.
+ * ``metrics_num_samples`` - . By default ``2``
+ * ``metrics_sample_window_ms`` - . By default ``30000``
+
+ Additional params for SASL.
+ Specify only when security protocol is ``SASL_PLAINTEXT`` or ``SASL_SSL``.
+
+ * ``sasl_mechanism`` – Authentication mechanism. Valid values are: ``PLAIN``, ``GSSAPI``, ``OAUTHBEARER``, ``SCRAM-SHA-256``, ``SCRAM-SHA-512``.
+ * ``sasl_plain_username`` – username for sasl authentication. Required for ``PLAIN`` or one of the ``SCRAM`` mechanisms. Is taken from login field.
+ * ``sasl_plain_password`` – password for sasl authentication. Required for ``PLAIN`` or one of the ``SCRAM`` mechanisms. Is taken from password field.
+ * ``sasl_kerberos_service_name`` – Service name to include in ``GSSAPI`` sasl mechanism handshake. Default: ``kafka``.
+ * ``sasl_kerberos_domain_name`` – kerberos domain name to use in ``GSSAPI`` sasl mechanism handshake. Default: one of bootstrap servers.
+ * ``sasl_oauth_token_provider`` – ``OAUTHBEARER`` token provider instance. Internally implemented class that returns value of password field.
+
+ Additional params for SSL:
+ Specify only when security protocol is ``SSL`` or ``SASL_SSL``.
+
+ * ``ssl_check_hostname`` – flag to configure whether ssl handshake should verify that the certificate matches the brokers hostname. By default: ``True``.
+ * ``ssl_cafile`` – optional filename of ca file to use in certificate verification. By default: ``None``.
+ * ``ssl_certfile`` – optional filename of file in pem format containing the client certificate, as well as any ca certificates needed to establish the certificate’s authenticity. By default: ``None``.
+ * ``ssl_crlfile`` – optional filename containing the CRL to check for certificate expiration. By default, no CRL check is done, ``None``.
+ * ``ssl_ciphers`` – optionally set the available ciphers for ssl connections. It should be a string in the OpenSSL cipher list format. By default ``None``.
+ * ``ssl_keyfile`` – optional filename containing the client private key. Is taken from ``login`` field for ``SSL`` protocol.
+ * ``ssl_password`` – optional password to decrypt the client private key. Is taken from ``password`` field for ``SSL`` protocol.
+
+
+ More details about these Kafka parameters can be found in
+ `Python API documentation `_.
+ Example "extras" field:
+
+ .. code-block:: json
+
+ {
+ "ssl_check_hostname": true,
+ "ssl_certfile": "/tmp/client-cert.pem"
+ }
+
+ When specifying the connection as URI (in :envvar:`AIRFLOW_CONN_{CONN_ID}` variable) you should specify it
+ following the standard syntax of DB connections, where extras are passed as parameters
+ of the URI (note that all components of the URI should be URL-encoded), like
+ ``kafka://login:PASSWORD@1.1.1.1:9092/schema?extra_param=smth``
+
+ .. code-block:: bash
+
+ export AIRFLOW_CONN_KAFKA_DEFAULT='kafka://:TOKEN@1.1.1.1:9092/SASL_PLAINTEXT?ssl_certfile=%2Ftmp%2Fclient-cert.pem'
+
+ But some parameters for the client should be ``int`` or ``bool``, therefore
+ put URL-encoded json to ``__extra__`` parameter:
+
+ .. code-block:: bash
+
+ export AIRFLOW_CONN_KAFKA_DEFAULT=kafka://:TOKEN@1.1.1.1:9092/SASL_PLAINTEXT?__extra__=%7B%22request_timeout_ms%22%3A45%2C%22ssl_check_hostname%22%3A%20true%7D
+
+ More in `library documentation `_
diff --git a/docs/apache-airflow-providers-apache-kafka/index.rst b/docs/apache-airflow-providers-apache-kafka/index.rst
new file mode 100644
index 0000000000000..b9932e9b75378
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-kafka/index.rst
@@ -0,0 +1,79 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+``apache-airflow-providers-apache-kafka``
+=========================================
+
+Content
+-------
+.. toctree::
+ :maxdepth: 1
+ :caption: Guides
+
+ Connection types
+
+
+.. toctree::
+ :maxdepth: 1
+ :caption: References
+
+ Python API <_api/airflow/providers/apache/kafka/index>
+ PyPI Repository
+ Installing from sources
+
+
+.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME!
+
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Commits
+
+ Detailed list of commits
+
+
+Package apache-airflow-providers-apache-kafka
+------------------------------------------------------
+
+`Apache Kafka `__
+
+
+Release: 1.0.0
+
+Provider package
+----------------
+
+This is a provider package for ``apache.kafka`` provider. All classes for this provider package
+are in ``airflow.providers.apache.kafka`` python package.
+
+Installation
+------------
+
+You can install this package on top of an existing Airflow 2.1+ installation via
+``pip install apache-airflow-providers-apache-kafka``
+
+PIP requirements
+----------------
+
+================== ==================
+PIP package Version required
+================== ==================
+``apache-airflow`` ``>=2.1.0``
+``kafka-python`` ``>=2.0.2``
+================== ==================
+
+.. include:: ../../airflow/providers/apache/kafka/CHANGELOG.rst
diff --git a/docs/apache-airflow-providers-apache-kafka/installing-providers-from-sources.rst b/docs/apache-airflow-providers-apache-kafka/installing-providers-from-sources.rst
new file mode 100644
index 0000000000000..1c90205d15b3a
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-kafka/installing-providers-from-sources.rst
@@ -0,0 +1,18 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. include:: ../installing-providers-from-sources.rst
diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst
index e281a45e85d1a..a58409aa34f65 100644
--- a/docs/apache-airflow/extra-packages-ref.rst
+++ b/docs/apache-airflow/extra-packages-ref.rst
@@ -126,6 +126,8 @@ custom bash/python providers).
+---------------------+-----------------------------------------------------+------------------------------------------------+
| apache.hive | ``pip install 'apache-airflow[apache.hive]'`` | All Hive related operators |
+---------------------+-----------------------------------------------------+------------------------------------------------+
+| apache.kafka | ``pip install 'apache-airflow[apache.kafka]'`` | All Kafka related operators & hooks |
++---------------------+-----------------------------------------------------+------------------------------------------------+
| apache.kylin | ``pip install 'apache-airflow[apache.kylin]'`` | All Kylin related operators & hooks |
+---------------------+-----------------------------------------------------+------------------------------------------------+
| apache.livy | ``pip install 'apache-airflow[apache.livy]'`` | All Livy related operators, hooks & sensors |
diff --git a/docs/integration-logos/apache/kafka.png b/docs/integration-logos/apache/kafka.png
new file mode 100644
index 0000000000000..8ead3a3845d7a
Binary files /dev/null and b/docs/integration-logos/apache/kafka.png differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index eb8f6d7795c27..577c2b3658494 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -219,6 +219,7 @@ Json
Jupyter
KEDA
KYLIN
+Kafka
Kalibrr
Kamil
Kerberized
@@ -981,6 +982,7 @@ json
jthomas
jupyter
jupytercmd
+kafka
keepalive
keepalives
kerberized
diff --git a/setup.py b/setup.py
index f5414fbafae05..f9836c30b2e2b 100644
--- a/setup.py
+++ b/setup.py
@@ -418,6 +418,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
jira = [
'JIRA>1.0.7',
]
+kafka = ['kafka-python>=2.0.2']
kerberos = [
'pykerberos>=1.1.13',
'requests_kerberos>=0.10.0',
@@ -680,6 +681,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
'apache.druid': druid,
'apache.hdfs': hdfs,
'apache.hive': hive,
+ 'apache.kafka': kafka,
'apache.kylin': kylin,
'apache.livy': http_provider,
'apache.pig': [],
diff --git a/tests/providers/apache/kafka/__init__.py b/tests/providers/apache/kafka/__init__.py
new file mode 100644
index 0000000000000..13a83393a9124
--- /dev/null
+++ b/tests/providers/apache/kafka/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/kafka/hooks/__init__.py b/tests/providers/apache/kafka/hooks/__init__.py
new file mode 100644
index 0000000000000..13a83393a9124
--- /dev/null
+++ b/tests/providers/apache/kafka/hooks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/kafka/hooks/test_kafka.py b/tests/providers/apache/kafka/hooks/test_kafka.py
new file mode 100644
index 0000000000000..1ba8d237fb846
--- /dev/null
+++ b/tests/providers/apache/kafka/hooks/test_kafka.py
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from airflow.providers.apache.kafka.hooks.kafka import KafkaHook, KafkaHookClient
+from tests.test_utils.config import env_vars
+
+
+class TestKafkaHook:
+ def test_client_attribute(self):
+ conn_uri = 'kafka://1.1.1.1:9092/PLAINTEXT'
+ with env_vars({'AIRFLOW_CONN_KAFKA_DEFAULT': conn_uri}):
+ hook = KafkaHook('kafka_default')
+ assert hasattr(hook, 'client')
+ assert isinstance(hook.client, KafkaHookClient)
+
+ def test_get_conn_url(self):
+ conn_uri = 'kafka://:XXXXX@1.1.1.1:9092/PLAINTEXT'
+ with env_vars({'AIRFLOW_CONN_KAFKA_DEFAULT': conn_uri}):
+ hook = KafkaHook('kafka_default')
+ assert hook.get_conn_url() == '1.1.1.1:9092'